What problem does Hystrix solve?

In a complex distributed application, there are many dependencies, and it is inevitable that each dependency will fail at some point. If the application does not isolate each dependency and reduce external risks, it will easily drag down the entire application.

Take a common example in the mall scene, such as order service calls inventory service, commodity service, integral service, payment service, the system is normal, the order module is normal operation.

However, when the integral service is abnormal and will be blocked for 30s, some requests of the order service will fail, and the worker thread will block on the invocation of the integral service.

When the traffic peak, the problem will be more serious, all the requests from the order service will block on the invocation integral service, all the worker threads will be suspended, resulting in machine resources exhausted, the order service is not available, resulting in cascading impact, the whole cluster down, this is called avalanche effect.

So you need a mechanism so that when a single service fails, the availability of the entire cluster is not affected. Hystrix is the framework to implement this mechanism. Let’s take a look at the overall mechanism of Hystrix.

Ii. Overall mechanism

[Entry] The execution entry point of Hystrix is HystrixCommand or HystrixObservableCommand object, which is usually implemented in Spring applications through annotations and AOP to reduce intrusive business code.

[Cache] After the actual execution of the HystrixCommand object, first, whether to enable the cache. If the cache is enabled and the HystrixCommand object is hit, the HystrixCommand object is returned directly.

[fuse] If the fuse is opened, the short circuit is executed and the downgrade logic is directly taken; If the fuse is closed, proceed to the next step, into the isolation logic. The state of the fuse is mainly based on the execution failure rate within the window period. If the failure rate is too high, the fuse opens automatically.

[Isolation] Users can configure thread pool isolation or semaphore isolation to determine that the thread pool task is full (or semaphore), then enter the degradation logic; Otherwise proceed to the next step, where the thread pool task thread actually performs the business call;

[Execution] The service call is actually executed. If the execution fails or an exception occurs, the degradation logic is entered. If the command is executed successfully, the system returns normally.

[Timeout] A timer is used to delay tasks to check whether the execution of the service invocation times out. If it times out, the thread executing the service is cancelled and the degradation logic is entered. If no timeout occurs, the system returns normally. Both the isolation mode of the thread pool and the semaphore policy support timeout configuration (the semaphore policy has defects).

“Downgrade” into the relegation after logic, when the business implements HystrixCommand. GetFallback () method, it returns the drop process of data; When not implemented, an exception is returned;

[Statistics] The success, failure, and timeout results of the service invocation are displayed in the statistics module. The fuses are enabled or disabled based on the health statistics results.

It is said that there are no secrets in the source code, so let’s take a look at the core functionality source code and see how Hystrix works as a whole.

Third, fusing

There are fuses in household circuits. The role of fuses is that when the circuit is faulty or abnormal, the current is constantly rising, and the rising current may damage some important components or valuable components in the circuit, and may also burn the circuit or even cause fire.

If the fuse is correctly placed in the circuit, the fuse will fuse itself to cut off the current when the current abnormally rises to a certain height and a certain time, thus playing the role of protecting the safe operation of the circuit. Hystrix provides a similar function for fuses. When the number of requests to a service provider exceeds the configured threshold in a certain period of time and the error rate during the window period is too high, the Hystrix fuses the call request. Subsequent requests are short-circuated and enter the degradation logic directly to implement the local degradation policy.

Hystrix is self-regulating. After a fuse is opened for a certain period of time, it attempts to pass a request and adjusts the state of the fuse according to the execution result. The state of the fuse is automatically switched between closed,open, and half-open.

【HystrixCircuitBreaker】 Boolean attemptExecution() : Each time HystrixCommand is executed, this method is called to determine whether the execution can continue. If the state of the fuse is open and the hibernation window is exceeded, update the state of the fuse to half-open. The CAS atom changes the fuse state to ensure that only one business request is actually invoked by the provider, and the state is adjusted based on the results of the execution.

public boolean attemptExecution(a) {
    // Determine whether the configuration is forced to open the fuse
    if (properties.circuitBreakerForceOpen().get()) {
        return false;
    }
    // Determine whether the configuration is forced to close the fuse
    if (properties.circuitBreakerForceClosed().get()) {
        return true;
    }
    // Check whether the fuse switch is off
    if (circuitOpened.get() == -1) {
        return true;
    } else {
        // Check whether the request is behind the sleep window
        if (isAfterSleepWindow()) {
            // The update switch is partially open and allows this request to pass
            if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                return true;
            } else {
                return false; }}else {
            // Reject the request
            return false; }}}Copy the code

【HystrixCircuitBreaker】void markSuccess() : called after HystrixCommand is successfully executed. When the status of the fuse is half-open, the status of the fuse is updated to Closed. In this case, the fuse was originally open, but the single request actually invoked the service provider, and the subsequent execution succeeds, Hystrix automatically adjusts the fuse to closed.

public void markSuccess(a) {
    // Update the fuse switch to off
    if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
        // Reset subscription health statistics
        metrics.resetStream();
        Subscription previousSubscription = activeSubscription.get();
        if(previousSubscription ! =null) {
            previousSubscription.unsubscribe();
        }
        Subscription newSubscription = subscribeToStream();
        activeSubscription.set(newSubscription);
        // Update the fuse switch to off
        circuitOpened.set(-1L); }}Copy the code

【HystrixCircuitBreaker】void markNonSuccess() : called after HystrixCommand is successfully executed. If the status of the fuse is half-open, update the status of the fuse to open. In this case, the fuse is originally open, but the single request actually calls the service provider, and the subsequent execution fails, Hystrix keeps the fuse open and uses this request as the start time of the sleep window.

public void markNonSuccess(a) {
      // Update the fuse switch from half-open to open
      if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
          // Record the failure time as the start time of the sleep windowcircuitOpened.set(System.currentTimeMillis()); }}Copy the code

【HystrixCircuitBreaker】void subscribeToStream() : If the current request data is larger than a certain value and the error rate is larger than the threshold, the status of the fuse is automatically updated to Opened. Subsequent requests are short-circuited, and the service provider is not actually invoked. The fuse directly enters the degraded logic.

 private Subscription subscribeToStream(a) {
    // Subscribe to the monitoring statistics
    return metrics.getHealthCountsStream()
            .observe()
            .subscribe(new Subscriber<HealthCounts>() {
                @Override
                public void onCompleted(a) {}
                @Override
                public void onError(Throwable e) {}
                @Override
                public void onNext(HealthCounts hc) {
                    // Check whether the total number of requests exceeds the configured threshold. If the total number does not exceed the threshold, do not change the fuse status
                    if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
 
                    } else {
                        // Check whether the request error rate exceeds the configured error rate threshold. If it does not exceed the threshold, do not change the fuse status. If so, the error rate is too high, the update fuse status is not enabled, and subsequent requests are rejected
                        if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
 
                        } else {
                            if(status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); }}}}}); }Copy the code

4. Resource isolation

In cargo ships, in order to prevent the spread of water leakage and fire, generally will be divided into warehouses, to avoid a warehouse accident led to the tragedy of the whole ship sank. Similarly, in Hystrix, the bulkhead mode is used to isolate the service providers in the system. Delays or failures of one service provider do not cause the system to fail, while controlling the concurrency of calls to these services. As shown in the figure below, the order service invoxes the downstream integration, inventory and other services using different thread pools. When the integration service fails, only the corresponding thread pool will be full, and the invocation of other services will not be affected. Hystrix isolation mode supports both the thread pool and semaphore modes.

4.1 Semaphore mode

The semaphore pattern controls concurrency from a single service provider. For example, if N is less than maxConcurrentRequests, a single CommondKey request will continue. MaxConcurrentRequests if maxConcurrentRequests is greater than or equal to maxConcurrentRequests The semaphore mode is executed using the requesting thread itself, there is no thread context switch, and the overhead is low, but the timeout mechanism is invalid.

ObservableapplyHystrixSemantics AbstractCommand 】 【 (finalAbstractCommand _cmd) : attempts to acquire a semaphore, if can get, continues to call the service provider; If no, enter the degrade policy.

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    executionHook.onStart(_cmd);
    // Determine whether the fuse passes
    if (circuitBreaker.attemptExecution()) {
        // Get the semaphore
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        final Action0 singleSemaphoreRelease = new Action0() {
            @Override
            public void call(a) {
                if (semaphoreHasBeenReleased.compareAndSet(false.true)) { executionSemaphore.release(); }}};final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
            @Override
            public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); }};// Try to acquire the semaphore
        if (executionSemaphore.tryAcquire()) {
            try {
                // Record the start time of service execution
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                // Continue to perform business
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                returnObservable.error(e); }}else {
            // The semaphore is rejected, and the degradation logic is entered
            returnhandleSemaphoreRejectionViaFallback(); }}else {
        // The fuses reject, the direct short circuit, into the degrade logic
        returnhandleShortCircuitViaFallback(); }}Copy the code

TryableSemaphore getExecutionSemaphore() : GetSemaphore instance (); TryableSemaphore getExecutionSemaphore() : GetSemaphore instance (); TryableSemaphore getExecutionSemaphore() : GetSemaphore instance (); If the current isolation mode to the thread pool, then use the DEFAULT semaphore TryableSemaphoreNoOp. The DEFAULT, all requests can be passed.

protected TryableSemaphore getExecutionSemaphore(a) {
    // Determine whether the isolation mode is a semaphore
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
        if (executionSemaphoreOverride == null) {
            // Get the semaphore
            TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
            if (_s == null) {
                // Initialize the semaphore and cache it
                executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
                // Return the semaphore
                return executionSemaphorePerCircuit.get(commandKey.name());
            } else {
                return_s; }}else {
            returnexecutionSemaphoreOverride; }}else {
        // Return the default semaphore, and any request can go through
        returnTryableSemaphoreNoOp.DEFAULT; }}Copy the code

4.2 Thread Pool Mode

The thread pool pattern controls the concurrency of execution by a single service provider. The code will fetch the semaphore first, just use the default semaphore, all requests will pass through, and then actually invoke the thread pool logic. In thread pool mode, for example, if the number of requests being made for a single CommondKey is N, if N is less than maximumPoolSize, a thread is first obtained from the Hystrix managed thread pool and then passed to the task thread to make the actual call. If the number of concurrent requests is more than the number of threads in the thread pool, There is a task that needs to be queued, but the queued queue also has an upper limit, if the queue is full, then go in and degrade the logic. Thread pool mode can support asynchronous call, support timeout call, there is thread switching, high overhead.

ObservableexecuteCommandWithSpecifiedIsolation AbstractCommand 】 【 (final AbstractCommand _cmd) : from the thread in the thread pool, and perform, in the process of recording thread state.

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
      // Check whether the thread pool is in isolation mode
      if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
          return Observable.defer(new Func0<Observable<R>>() {
              @Override
              public Observable<R> call(a) {
                  executionResult = executionResult.setExecutionOccurred();
                  if(! commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                  }
                  // Statistics
                  metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
                  // Check whether it has timed out, and if so, throw an exception
                  if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                      return Observable.error(new RuntimeException("timed out before executing run()"));
                  }
                  // Update the thread status to started
                  if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                      HystrixCounters.incrementGlobalConcurrentThreads();
                      threadPool.markThreadExecution();
                      endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                      executionResult = executionResult.setExecutedInThread();
                      // Execute hook, if exception, directly throw exception
                      try {
                          executionHook.onThreadStart(_cmd);
                          executionHook.onRunStart(_cmd);
                          executionHook.onExecutionStart(_cmd);
                          return getUserExecutionObservable(_cmd);
                      } catch (Throwable ex) {
                          returnObservable.error(ex); }}else {
                      / / return empty
                      return Observable.empty();
                  }
              }
          }).doOnTerminate(new Action0() {
              @Override
              public void call(a) {
                  // End logic, omit
              }
          }).doOnUnsubscribe(new Action0() {
              @Override
              public void call(a) {
                  // Unsubscribe logic, omit
              }
              // Get the business execution thread from the thread pool
          }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
              @Override
              public Boolean call(a) {
                  // Check whether the timeout period has expired
                  returnproperties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; }})); }else {
          // Semaphore mode
          / / to omit}}Copy the code

【HystrixThreadPool】Subscription (final Action0 Action) : HystrixContextScheduler is Hystrix’s rewrite of the Scheduler Scheduler in RX. The main purpose of Hystrix is not to execute commands when the Observable is not subscribed, and to support the ability to interrupt the execution of commands. In RX, Scheduler will generate the corresponding Worker to Observable for executing commands, and the Worker is specifically responsible for the scheduling of related execution threads. ThreadPoolWorker is a Worker implemented by Hystrix itself and is the core method of execution scheduling.

public Subscription schedule(final Action0 action) {
    // If there is no subscription, no direct return is performed
    if (subscription.isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }
    ScheduledAction sa = new ScheduledAction(action);
    subscription.add(sa);
    sa.addParent(subscription);
    // Get the thread pool
    ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
    // Submit the task for executionFutureTask<? > f = (FutureTask<? >) executor.submit(sa); sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
    return sa;
}
Copy the code

5. Timeout detection

The Hystrix timeout mechanism reduces the impact of high third-party dependency latency on callers, causing requests to fail quickly. It is mainly realized by delayed task mechanism, including registration delayed task process and execution delayed task process.

When isolation strategy as the thread pool, the main thread to subscribe to perform as a result, the thread pool task thread calls the provider service side, at the same time there will be a timer thread in a certain time after testing tasks to be achieved, if not completed the said task timeout, throw exceptions, overtime and subsequent task threads of execution result will skip no longer release; If yes, the task is completed within the timeout period and the timer detection task is complete.

When the isolation policy is a semaphore, the main thread subscribes to the execution results and actually calls the provider server (no task thread). When the specified time is exceeded, the main thread still completes the business call and then throws a timeout exception. The timeout configuration in semaphore mode has some drawbacks. Calls in execution cannot be cancelled and the main thread return time cannot be limited.

ObservableexecuteCommandAndObserve AbstractCommand 】 【 (finalAbstractCommand _cmd) : Timeout detection entry, the implementation of lift (new HystrixObservableTimeoutOperator (_cmd) connection timeout detection task.

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    / / to omit
    Observable<R> execution;
    // Check whether timeout detection is enabledif (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                // Add timeout detection
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        // Execute normally
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }
    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}
Copy the code

The Subscriber HystrixObservableTimeoutOperator 】 【 <? super R> call(final Subscriber<? Super R> Child) : Creates a detection task and associates the delay task; If the detection task is not completed, a timeout exception is thrown. If the detection task is complete or an exception occurs, the detection task is cleared.

public Subscriber<? super R> call(final Subscriber<? super R> child) {
        final CompositeSubscription s = new CompositeSubscription();
        child.add(s);
        final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
        // Instantiate the listener
        TimerListener listener = new TimerListener() {
            @Override
            public void tick(a) {
                // If the task is not completed, it is updated to timeout
                if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                    // Failed to report timeout
                    originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                    // Unsubscribe
                    s.unsubscribe();
                    final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
 
                        @Override
                        public void run(a) {
                            child.onError(newHystrixTimeoutException()); }});// Throw a timeout exceptiontimeoutRunnable.run(); }}// Set the timeout period
            @Override
            public int getIntervalTimeInMilliseconds(a) {
                returnoriginalCommand.properties.executionTimeoutInMilliseconds().get(); }};// Register the listener and associate the detection task
        final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
        originalCommand.timeoutTimer.set(tl);
        Subscriber<R> parent = new Subscriber<R>() {
            @Override
            public void onCompleted(a) {
                if (isNotTimedOut()) {
                    // If no timeout occurs, the task is completed and the timeout detection task is clearedtl.clear(); child.onCompleted(); }}@Override
            public void onError(Throwable e) {
                if (isNotTimedOut()) {
                    // If the timeout detection task is abnormal, the timeout detection task is clearedtl.clear(); child.onError(e); }}@Override
            public void onNext(R v) {
                    // Publish the execution result without a timeout; If a timeout occurs, publishing the execution result is skipped
                if(isNotTimedOut()) { child.onNext(v); }}// Check whether the timeout period has expired
            private boolean isNotTimedOut(a) {
                returnoriginalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); }}; s.add(parent);returnparent; }}Copy the code

ReferenceaddTimerListener HystrixTimer 】 【 (finalTimerListener listener) : AddTimerListener The schedule task service scheduleAtFixedRate is used to execute tasks after the timeout period.

Public Reference addTimerListener(final TimerListener listener) {// Initialize xianstartThreadIfNeeded(); Runnable r = new Runnable() {

public Reference<TimerListener> addTimerListener(final TimerListener listener) {
    // Initialize xian
    startThreadIfNeeded();
    // Construct the inspection task
    Runnable r = new Runnable() {
 
        @Override
        public void run(a) {
            try {
                listener.tick();
            } catch (Exception e) {
                logger.error("Failed while ticking TimerListener", e); }}};// Delay the detection taskScheduledFuture<? > f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);return new TimerReference(listener, f);
}
Copy the code

Six, the drop

The Hystrix degradation logic serves as a backstop policy. When a service execution exception occurs, the thread pool or semaphore is full, or the execution times out, the Hystrix degradation logic is entered. Generic returns should be obtained from memory or static logic in the degraded logic, and try not to rely on network calls. If the degraded method is not implemented or an exception occurs in the degraded method, an exception will be thrown in the business thread.

Observables AbstractCommand 】 【 getFallbackOrThrowException (finalAbstractCommand _cmd, final HystrixEventType eventType, Final FailureType FailureType, Final String Message, final Exception originalException) : First check whether the Exception is unrecoverable. If yes, no degradation logic is used and the Exception is returned directly. Secondly, judge whether the degraded semaphore can be obtained, and then go to the degraded logic; An exception is returned when an exception also occurs in the degradation logic or when no degradation method is implemented.

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
    executionResult = executionResult.addEvent((int) latency, eventType);
    // Check whether the exception is unrecoverable, such as stack overflow or OOM
    if (isUnrecoverable(originalException)) {
        logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);
        Exception e = wrapWithOnErrorHook(failureType, originalException);
        // Return the exception directly
        return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + "" + message + " and encountered unrecoverable error.", e, null));
    } else {
        // Determine whether the error is recoverable
        if (isRecoverableError(originalException)) {
            logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
        }
        // Check whether the degrade configuration is enabled
        if (properties.fallbackEnabled().get()) {
          /** * Omitted */
            final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
                @Override
                public Observable<R> call(Throwable t) {
                    Exception e = wrapWithOnErrorHook(failureType, originalException);
                    Exception fe = getExceptionFromThrowable(t);
 
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    Exception toEmit;
                    // Is an unsupported operation exception, which is thrown when the getFallBack method is not overwritten by the business
                    if (fe instanceof UnsupportedOperationException) {
                        logger.debug("No fallback for HystrixCommand. ", fe);
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
                        executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);
                        toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + "" + message + " and no fallback available.", e, fe);
                    } else {
                        // An exception occurred while performing the degradation logic
                        logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
                        executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);
                        toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + "" + message + " and fallback failed.", e, fe);
                    }
                    // Check whether the exception is wrapped
                    if (shouldNotBeWrapped(originalException)) {
                        // Throw an exception
                        return Observable.error(e);
                    }
                    // Throw an exception
                    returnObservable.error(toEmit); }};// Get the degraded semaphore
            final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call(a) {
                    if (semaphoreHasBeenReleased.compareAndSet(false.true)) { fallbackSemaphore.release(); }}}; Observable<R> fallbackExecutionChain;// Try to get the degraded semaphore
            if (fallbackSemaphore.tryAcquire()) {
                try {
                    // Check whether the fallback method is defined
                    if (isFallbackUserDefined()) {
                        executionHook.onFallbackStart(this);
                        // Perform the degradation logic
                        fallbackExecutionChain = getFallbackObservable();
                    } else {
                        // Perform the degradation logicfallbackExecutionChain = getFallbackObservable(); }}catch (Throwable ex) {
                    fallbackExecutionChain = Observable.error(ex);
                }
                return fallbackExecutionChain
                        .doOnEach(setRequestContext)
                        .lift(new FallbackHookApplication(_cmd))
                        .lift(newDeprecatedOnFallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease);  }else {
                // Handle the degraded semaphore reject exception
               returnhandleFallbackRejectionByEmittingError(); }}else {
            // Handle an exception when the degraded configuration is closed
            returnhandleFallbackDisabledByEmittingError(originalException, failureType, message); }}}Copy the code

【HystrixCommand】R getFallback() : By default, HystrixCommand throws operations that do not support exceptions, requiring subclasses to override getFalBack to implement degraded logic.

protected R getFallback(a) {
    throw new UnsupportedOperationException("No fallback available.");
}
Copy the code

7. Health statistics

Hystrix determines the rate of service failures based on the data statistics through the sliding window, enabling fast failure and degradation logic. The steps are as follows:

  • AbstractCommand publishes the HystrixCommandCompletion event to the event flow by calling the handleCommandEnd method.

  • Eventflows are grouped by time using the Observable.window() method and aggregated into buckets by type (success, failure, etc.) using the flatMap() method.

  • Then the buckets are aggregated into sliding window data using Observable.window() according to the number of buckets in the window;

  • Aggregation of sliding window data into data objects (such as health data streams, cumulative data, etc.)

  • When CircuitBreaker is initialized, it subscribers to the health data stream and modifies the switch of the fuse based on the health condition.

【AbstractCommand】void handleCommandEnd(Boolean commandExecutionStarted) : After the business completes, the handleCommandEnd method is called, in which the executionResult, which is also the entry point for health statistics, is reported.

private void handleCommandEnd(boolean commandExecutionStarted) {
    Reference<TimerListener> tl = timeoutTimer.get();
    if(tl ! =null) {
        tl.clear();
    }
​
    long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
    executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
    // Report the execution result to health statistics
    if (executionResultAtTimeOfCancellation == null) {
        metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
    } else {
        metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
    }
​
    if(endCurrentThreadExecutingCommand ! =null) { endCurrentThreadExecutingCommand.call(); }}Copy the code

BucketedRollingCounterStream BucketedRollingCounterStream 】 【 (HystrixEventStream stream, final int numBuckets, int bucketSizeInMs,final Func2<Bucket, Event, Bucket> appendRawEventToBucket,final Func2<Output, Bucket, Output> re-duceBucket)

Health statistics class HealthCountsStream sliding window to realize primarily in the parent class BucketedRollingCounterStream, first of all, the superclass BucketedCounterStream will event stream processing into a bucket of flow, BucketedRollingCounterStream processing into sliding window, and then by HealthCountsStream incoming reduceBucket function into health statistics processing.

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                                       final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                                       final Func2<Output, Bucket, Output> reduceBucket) {
    // Call the parent class and process the data into a bucket stream
    super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
    // Process the data in the sliding window according to the passed reduceBucket function
    Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
        @Override
        public Observable<Output> call(Observable<Bucket> window) {
            returnwindow.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); }};// Perform operations on the parent bucket stream data
    this.sourceStream = bucketedStream
    // The number of buckets in the window is numBuckets. Move the buckets one at a time
            .window(numBuckets, 1)
            // Data processing within the sliding window
            .flatMap(reduceWindowToSummary)
            .doOnSubscribe(new Action0() {
                @Override
                public void call(a) {
                    isSourceCurrentlySubscribed.set(true);
                }
            })
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call(a) {
                    isSourceCurrentlySubscribed.set(false);
                }
            })
            .share()
            .onBackpressureDrop();
}
Copy the code

[HealthCounts] HealthCounts plus(long[] eventTypeCounts) : Aggregates data in the bucket by event type and generates statistics HealthCounts.

public HealthCounts plus(long[] eventTypeCounts) {
    long updatedTotalCount = totalCount;
    long updatedErrorCount = errorCount;
​
    long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
    long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
    long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
    long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
    long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
    / / the total number of
    updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
    / / number of failure
    updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
    return new HealthCounts(updatedTotalCount, updatedErrorCount);
}
Copy the code

Eight, summary

In a distributed environment, it is inevitable that there will be failures among the dependencies of many services. Hystrix is a library that helps users control the interactions between distributed services by adding logic such as circuit breaker, isolation, and degradation to improve the overall resilience of the system. The main functions are as follows:

  • Secure the system to control delays and failures from accessing third-party dependencies, usually over the network

  • Prevent cascading failures in complex distributed systems

  • Fast fail and fast recover

  • Smooth relegation

  • Near real time monitoring, alerts and controls

There are a few things to watch out for when using Hystrix:

  • Override the getFallback() method, trying not to have network dependencies. If there are network dependencies, multiple downgradations are recommended by instantiating HystrixCommand within getFallback() and executing the Command. GetFallback () tries to ensure high performance returns and fast degradation.

  • HystrixCommand recommends a thread isolation strategy.

  • Hystrix. Threadpool. Default. AllowMaximumSizeToDivergeFromCoreSize when set to true, hystrix. Threadpool. Default. MaximumSize will only take effect. The maximum number of threads needs to be considered based on the business itself and performance testing results, and should be set initially as small as possible to support dynamic resizing, as it is the primary tool for reducing load and preventing resources from being blocked when delays occur.

  • Under the signal isolation policy, the parent thread of the application service (such as the Tomcat container thread) is used to execute the business logic. Therefore, you must set the amount of concurrency. This policy is not recommended for calls with network overhead. It is easy to queue container threads and affect the entire application service.

  • In addition, Hystrix relies heavily on RxJava, a responsive functional programming framework. A brief understanding of how RxJava is used will help you understand the source logic.

Reference documentation

Hystrix Github repository: github.com/Netflix/Hys…