Rely on the following

<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-core</artifactId>
    <version>1.5.18</version>
</dependency>
Copy the code

What is the Hystrix

2018.11 released the last version, currently dealing with the maintenance phase, no upgrade version

  • USES:

    • Stop the cascading fault. Fallback and elegant downgrades, Fail Fast and quick recoveries
    • Monitor and configure real-time changes
    • Resource isolation. Partial unavailability does not cause system unavailability
  • Scenario: On the commodity list interface, data such as red packets, prices, and labels need to be obtained. You can give this guy a thread pool. If the thread pool is full, the non-commodity list interface of the current service is not affected

  • Use: the framework of main use Rxjava hystrix, fit may refer to: www.jianshu.com/p/5e93c9101…

Perform the entrance

Hystrix executes with command as the entry point. AbstractCommand implements the Command Command almost all logic, there are two subclasses were HystrixCommand HystrixCommand, HystrixObservableCommand 99% scene, so the following only explain this Command class, It provides two methods: execute() synchronous execution and queue() asynchronous execution

public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
    / /... Omit all constructors
    / /... Omit the static configuration inner class Setter
  
    // The thread used for execution
    private final AtomicReference<Thread> executionThread = new AtomicReference<Thread>();
    private final AtomicBoolean interruptOnFutureCancel = new AtomicBoolean(false);
    
    Execute () or queue calls the run() method **/
    protected abstract R run(a) throws Exception;
    /** * a failure to call execute() or queue() degrades the call to getFallback(). Don't realize the current method, will throw an UnsupportedOperationException abnormal * * * / by default
    protected R getFallback(a) { throw new UnsupportedOperationException("No fallback available."); }
  
    /** * Whether to customize the failed method, if so, put it in commandContainsFallback map */
    @Override
    protected boolean isFallbackUserDefined(a) {
      Boolean containsFromMap = commandContainsFallback.get(commandKey);
      if(containsFromMap ! =null) {
        return containsFromMap;
      } else {
        Boolean toInsertIntoMap;
        try {
          getClass().getDeclaredMethod("getFallback");
          toInsertIntoMap = true;
        } catch (NoSuchMethodException nsme) {
          toInsertIntoMap = false;
        }
        commandContainsFallback.put(commandKey, toInsertIntoMap);
        returntoInsertIntoMap; }}/** * commandIsScalar=true, In makeEmits will circuitBreaker. MakeSuccess () * - in HystrixObservableCommand to false * /
    @Override
    protected boolean commandIsScalar(a) { return true; }
    /** * The instruction used for synchronous execution */
    public R execute(a) {
      try {
        // Queue is a Future. Get blocks the result, so execute() is a synchronous instruction
        // Finally queue() is called
        return queue().get();
      } catch (Exception e) { throwExceptions.sneakyThrow(decomposeException(e)); }}/** * Instructions for asynchronous execution * commands are queued in the thread pool and return the Future for the result */
    public Future<R> queue(a) {
      ToObservable () : executes commands asynchronously with callbacks by subscribing to {@Link Observable}.
      final Future<R> delegate = toObservable().toBlocking().toFuture();
      final Future<R> f = new Future<R>() {
        /** Interrupt the running method **/
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
          if (delegate.isCancelled()) { return false; }
          if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
            interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
          }
  
          final boolean res = delegate.cancel(interruptOnFutureCancel.get());
          if(! isExecutionComplete() && interruptOnFutureCancel.get()) {final Thread t = executionThread.get();
            if(t ! =null && !t.equals(Thread.currentThread())) {
              // Finally interrupt() is calledt.interrupt(); }}return res;
        }
        @Override
        public boolean isCancelled(a) { return delegate.isCancelled(); }
        @Override
        public boolean isDone(a) { return delegate.isDone(); }
        @Override
        public R get(a) throws InterruptedException, ExecutionException { return delegate.get(); }
        @Override
        public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
          returndelegate.get(timeout, unit); }};/** Special handling of the immediately thrown error state. But the Future was thrown wrong right after it was created? * * /
      if (f.isDone()) {
        try {
          f.get();
          return f;
        } catch (Exception e) {
          // Get the exception to be thrown
          Throwable t = decomposeException(e);
          if (t instanceof HystrixBadRequestException) {
            return f;
          } else if (t instanceof HystrixRuntimeException) {
            HystrixRuntimeException hre = (HystrixRuntimeException) t;
            switch (hre.getFailureType()) {
              case COMMAND_EXCEPTION:
              case TIMEOUT:
                // we don't throw these types from queue() only from queue().get() as they are execution errors
                return f;
              default:
                // these are errors we throw from queue() as they as rejection type errors
                throwhre; }}else {
            throwExceptions.sneakyThrow(t); }}}returnf; }}Copy the code

ToObservable () method

HystrixCommand has execute() and queue() methods. ToObservable () in queue() is then called. ToObservable ()

  • ApplyHystrixSemantics ()
  1. Determines whether the thread is NOT_STARTED, otherwise HystrixRuntimeException is thrown, and CAS ensures that the current command execution is unique
  2. Use HystrixRequestLog to log the execution of this command (requestLogEnabled = false to turn off logging)
  3. When request caching is enabled, the data is fetched from the cache

3.1 requestCacheEnabled = true && getCacheKey()! = null (so do not return null when overwriting the cache method, otherwise it does not take effect) If caching is not enabled or missed, execute the target command to get results. 4.1 Observable.defer() The target method does not execute immediately and requires a subscription to execute asynchronously. ApplyHystrixSemantics () ¶ TerminateCommandCleanup: flag the thread status as TERMINAL 6.1.1 object code is not executed (e.g., the result from the cache) : HystrixCommandMetrics#markCommandDone(), Trigger execution after the completion of the function callback (if endCurrentThreadExecutingCommand not null) 6.1.2 target execution execution. Using markCommandDone (true) tag 6.2 unsubscribeCommandCleanup will write thread state is UNSUBSCRIBED. Trigger executionHook. 6.3 fireOnCompleteHook onUnsubscribe only trigger executionHook. OnSuccess

public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
  public Observable<R> toObservable(a) {
    // Omit the previous Action and Fun
    // Create an Observable with observable.defer ()
    return Observable.defer(() -> {
      // This is a stateful object, so it can only be used once (CAS substitution limits entry to once only)
      /* this is a stateful object so can only be used once */
      if(! commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex =new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
        //TODO make a new error type for this
        throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
      }
      // Enter the command start time
      commandStartTimestamp = System.currentTimeMillis();

      // Check whether I request logging is enabled
      if (properties.requestLogEnabled().get()) {
        // log this command execution regardless of what happened
        if(currentRequestLog ! =null) { currentRequestLog.addExecutedCommand(_cmd); }}// Whether request caching is enabled
      final boolean requestCacheEnabled = isRequestCachingEnabled();
      // -- cacheKey is null by default, so it is disabled by default
      final String cacheKey = getCacheKey();
      // If the cache is enabled, it is first fetched from the cache
      /* try from cache first */
      if (requestCacheEnabled) {
        // fetch it from the cache
        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
        if(fromCache ! =null) {
          // If not empty. IsResponseFromCache = true and return data
          isResponseFromCache = true;
          returnhandleRequestCacheHitAndEmitValues(fromCache, _cmd); }}// Wrap as hystrixObservable
      // Get the command Observable
      Observable<R> hystrixObservable =
              Observable.defer(applyHystrixSemantics)
                      .map(wrapWithAllOnNextHooks);
      // Get the cache Observable
      Observable<R> afterCache;

      // Whether to push into the cache (this is done only if the cacheKey is not empty. The default cacheKey is empty, which requires setting.)
      // put in cache
      if(requestCacheEnabled && cacheKey ! =null) {
        // wrap it for caching
        HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
        if(fromCache ! =null) {
          // another thread beat us so we'll use the cached value instead
          toCache.unsubscribe();
          isResponseFromCache = true;
          return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
        } else {
          // we just created an ObservableCommand so we cast and return itafterCache = toCache.toObservable(); }}else {
        afterCache = hystrixObservable;
      }

      return afterCache
              // Listen when a subscription is about to be terminated, either normally or abnormally
              .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
              // Listen when unsubscribing
              .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
              // Observable terminates normally when listening.doOnCompleted(fireOnCompletedHook); }); }}Copy the code

ApplyHystrixSemantics () method

The final execution target method calls applyHystrixSemantics() as follows:

  1. Determine whether the circuit breaker allowed to execute circuitBreaker. AllowRequest (). It is not allowed to execute a fallback directly
  2. Attempt to obtain semaphore resources. Thread pool isolation mode will use TryableSemaphoreNoOp and return true
  3. Execute the target method executeCommandAndObserve(). Failure will perform circuit logic, call handleSemaphoreRejectionViaFallback ()
class AbstractCommand {
    / /... Omit the other
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
      // Mark it up before execution
      // There are many executionHook and eventNotifier operations in the source code, which is an extension of Hystrix. Nothing is done in this, leaving a hole for developers to expand
      executionHook.onStart(_cmd);

      // Determine whether the circuit breaker is allowed to run
      / / - opens the circuit breaker called (withCircuitBreakerEnabled (true)) : HystrixCircuitBreakerImpl
      / / - closed circuit breaker called (withCircuitBreakerEnabled (false)) : NoOpCircuitBreaker. Returns true
      if (circuitBreaker.allowRequest()) {
        // Get the execution semaphore. If no configuration semaphore mode, return TryableSemaphoreNoOp. DEFAULT
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        final Action0 singleSemaphoreRelease = () -> {
          if (semaphoreHasBeenReleased.compareAndSet(false.true)) {
            / / if it is TryableSemaphoreNoOp. DEFAULT, that is an empty method of the operationexecutionSemaphore.release(); }};final Action1<Throwable> markExceptionThrown = t -> eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);

        // Determine whether the semaphore is rejected
        // Thread pool mode will use TryableSemaphoreNoOp, which returns true
        if (executionSemaphore.tryAcquire()) {
          try {
            executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
            // key !!!! : Handles quarantine policies and Fallback policies ()
            // -- executeCommandAndObserve handles isolation policies and various fallbacks. The target method will be implemented eventually!!
            // -- executeCommandAndObserve handles isolation policies and various fallbacks. The target method will be implemented eventually!!
            // -- executeCommandAndObserve handles isolation policies and various fallbacks. The target method will be implemented eventually!!
            return executeCommandAndObserve(_cmd)
                    .doOnError(markExceptionThrown)
                    .doOnTerminate(singleSemaphoreRelease)
                    .doOnUnsubscribe(singleSemaphoreRelease);
          } catch (RuntimeException e) {
            returnObservable.error(e); }}else {
          returnhandleSemaphoreRejectionViaFallback(); }}else {
        // The request is not accepted. The fallback() method is executed
        returnhandleShortCircuitViaFallback(); }}}Copy the code

Implement target method

The core method for executing the target method is executeCommandAndObserve()

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
  /** * Handles isolation policies and various fallbacks. Ultimately, the !!!!!!! of the target method is executed * This decorates "Hystrix" functionality around the run() Observable. */
  private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    // Execution context. Ensure that the main thread parameters are also available in the thread pool
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

    / * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 1. Record the result result events for: SUCCESS * 2. Closed circuitBreaker circuit breaker (if has been closed to ignore bai) * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
    final Action1<R> markEmits = r -> {
      // Whether data should be reported in the onNext step
      // HystrixCommand -> false
      // HystrixObservableCommand -> true
      if (shouldOutputOnNextEvents()) {
        executionResult = executionResult.addEvent(HystrixEventType.EMIT);
        eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
      }
      if (commandIsScalar()) {
        // Whether the command is a scalar
        // HystrixCommand -> true
        // HystrixObservableCommand -> false
        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();

        // This code is important:
        // Record the result as SUCCESS
        / / and, and, and circuitBreaker. MarkSuccess (); (If the circuit breaker is open, it is closed.)
        eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
        eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
        executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); }};/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 1. Ensure that the Scala type results can also be normally closed circuit breaker and marked Success * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
    final Action0 markOnCompleted = () -> {
      if(! commandIsScalar()) {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
        // These lines of code are important: markEmits works the same as above. However, commandIsScalar() == false is called in the case of HystrixObservableCommand
        // Record the result as SUCCESS
        / / and, and, and circuitBreaker. MarkSuccess (); (If the circuit breaker is open, it is closed.)
        eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
        eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
        executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); }};/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * when an exception is thrown during the execution of a target method (program might be the problem, may be a timeout, etc.), will enter here to deal with, handle a case can be divided into two categories: * * 1. Trigger fallback function * 2. Do not trigger the fallback function * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
      @Override
      public Observable<R> call(Throwable t) {
        // Throw Throwable t into Exception e.
        // If t is an NPE exception, then t and e are exactly the same.
        // t is not equal to e only if t is an error class
        Exception e = getExceptionFromThrowable(t);
        // Record the exception e during execution
        executionResult = executionResult.setExecutionException(e);
        if (e instanceof RejectedExecutionException) {
          // Thread pool rejected
          return handleThreadPoolRejectionViaFallback(e);
        } else if (t instanceof HystrixTimeoutException) {
          // Target method execution timed out
          return handleTimeoutViaFallback();
        } else if (t instanceof HystrixBadRequestException) {
          // Detailed breakdown below
          return handleBadRequestByEmittingError(e);
        } else {
          /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */
          / / out way, only when the subclasses autotype getExceptionFromThrowable () method is likely to enter here
          if (e instanceof HystrixBadRequestException) {
            eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
            return Observable.error(e);
          }

          returnhandleFailureViaFallback(e); }}};final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
      @Override
      public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); }}; Observable<R> execution;/ / is focused on: executeCommandWithSpecifiedIsolation ()
    / / is focused on: executeCommandWithSpecifiedIsolation ()
    if (properties.executionTimeoutEnabled().get()) {
      // Timeout support is enabled. Much. Lift (new HystrixObservableTimeoutOperator < R > (_cmd)) call
      execution = executeCommandWithSpecifiedIsolation(_cmd)
              .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
      // Timeout support is not enabled
      execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    // Once you've got execution, start registering some basic events, observers
    // -- doOnNext() : call before the observer is called back (when the data has already been sent, i.e. the target method has already been executed)
    return execution.doOnNext(markEmits)
            // -- doOnCompleted() : called on normal completion
            .doOnCompleted(markOnCompleted)
            // -- onErrorResumeNext() : called when executing an error
            .onErrorResumeNext(handleFallback)
            // -- doOnEach() : each call is executed. The request context is set for the child threads to complete cross-thread communication
            .doOnEach(setRequestContext);
  }

  /** * The actual execution of the target method (handled depending on the isolation type specified: THREAD or SEMAPHORE) * -- THREAD: THREAD pool isolation (default) * -- SEMAPHORE: SEMAPHORE isolation */
  private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
      // Thread pool isolation (default)
      // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
      return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call(a) {
          // Set up a thread pool to isolate occupancy and record data
          executionResult = executionResult.setExecutionOccurred();
          // The thread state must be OBSERVABLE_CHAIN_CREATED for execution
          // This state is set by toObservable()
          if(! commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
          }

          // Collecting indicator information: The system starts to collect indicator information
          metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

          // The run() method has not been executed. It timed out during a thread switch and returns an exception
          if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
            // the command timed out in the wrapping thread so we will return immediately
            // and not increment any of the counters below or other such logic
            return Observable.error(new RuntimeException("timed out before executing run()"));
          }

          // CAS sets the ThreadState to ThreadState.STARTED
          if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
            //we have not been unsubscribed, so should proceed
            // Thread global counter +1. Semaphore does not need this counter
            HystrixCounters.incrementGlobalConcurrentThreads();
            // marks the thread thread ready to start execution
            threadPool.markThreadExecution();
            // store the command that is being run
            // This save uses ThreadLocal
      
       > to bind to the current thread
      
            // This ensures thread-safe execution of the command
            endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
            executionResult = executionResult.setExecutedInThread();
            /** * If any of these hooks throw an exception, then it appears as if the actual execution threw an error */
            try {
              // Execute the hook program and the target run method program
              executionHook.onThreadStart(_cmd);
              executionHook.onRunStart(_cmd);
              executionHook.onExecutionStart(_cmd);
              / / getUserExecutionObservable: getExecutionObservable () abstract methods to get the target method
              return getUserExecutionObservable(_cmd);
            } catch (Throwable ex) {
              returnObservable.error(ex); }}else {
            //command has already been unsubscribed, so return immediately
            return Observable.error(new RuntimeException("unsubscribed before executing run()"));
          }
        }
      }).doOnTerminate(() -> {
        if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
          handleThreadEnd(_cmd);
        }
        if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
          //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
        }
        //if it was unsubscribed, then other cleanup handled it
      }).doOnUnsubscribe(() -> {
        if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
          handleThreadEnd(_cmd);
        }
        if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
          //if it was never started and was cancelled, then no need to clean up
        }
        //if it was terminal, then other cleanup handled it
      }).subscribeOn(threadPool.getScheduler(() -> properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT));
    } else {
      return Observable.defer(() -> {
        executionResult = executionResult.setExecutionOccurred();
        if(! commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
        }

        metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
        // semaphore isolated
        // store the command that is being run
        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
        try {
          executionHook.onRunStart(_cmd);
          executionHook.onExecutionStart(_cmd);
          return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
        } catch (Throwable ex) {
          //If the above hooks throw, then use that as the result of the run method
          returnObservable.error(ex); }}); }}}Copy the code

Methods the drop

How to call a target method

What’s a downgrade

Hystrix triggers fallback degradation logic in 5 cases:

  • Short – circuited short circuit
  • Threadpool -rejected indicates the rejected of the threadpool
  • Semaphore – Rejected Indicates the semaphore rejected
  • Time – out timeout
  • Failed Execution failed

But in addition to the above types, HystrixBadRequestException exception (not trigger back, don’t calculate failure indicator) will not trigger a fallback mechanism. Used in scenarios such as handling 400 error codes

The downgrade flow chart is as follows: