1. What problem does Hystrix solve?

In a complex distributed application, there are many dependencies, and each dependency is bound to fail at some point. If the application does not isolate the dependencies and reduce external risks, it can easily bring down the entire application.

For example, the order service calls the inventory service, commodity service, credit service and payment service. When the system is normal, the order module runs normally.

However, when an exception occurs to the credit service and blocks for 30s, part of the order service fails and the worker thread is blocked on calling the credit service.

The problem becomes even worse during peak traffic, when all requests from the order service are blocked on the call to the credit service, and the worker threads are all suspended, causing the machine to run out of resources and the order service to become unavailable, causing a cascading effect and the entire cluster to go down, known as an avalanche effect.

So you need a mechanism that allows the availability of the entire cluster to remain intact if a single service fails. Hystrix is the framework for this mechanism, and let’s take a look at how Hystrix works as a whole.

Second, the overall mechanism

[Entry] The execution entry of Hystrix is HystrixCommand or HystrixObservableCommand object, which is usually constructed through annotations and AOP in Spring applications to reduce the intrusion of business code.

[Cache] After the actual execution of HystrixCommand object, first whether to enable the cache, if the cache is enabled and hit, then directly return;

[Fuse] If the fuse is opened, short circuit will be executed and the downgrading logic will be followed directly. If the fuse is closed, proceed to the next step and enter the isolation logic. The state of the fuse is mainly based on the failure rate of execution in the window period. If the failure rate is too high, the fuse will be opened automatically.

[Isolation] The user can configure thread pool isolation or semaphore isolation to judge that the thread pool task is full (or semaphore), and then enter the degradation logic; Otherwise proceed to the next step and the business invocation is actually performed by the thread pool task thread;

[Execution] The actual execution of the business call begins. If the execution fails or is abnormal, the downgrade logic will be entered. If the execution succeeds, it returns normally.

[Timeout] The timer delay task is used to detect whether the execution of the business call has timed out. If it does, the thread of business execution will be cancelled and the downgrading logic will be entered. If it does not time out, it returns normally. Both the thread pool and semaphore policies are isolated to support timeout configuration (semaphore policy is flawed).

“Downgrade” into the relegation after logic, when the business implements HystrixCommand. GetFallback () method, it returns the drop process of data; When not implemented, an exception is returned.

[Statistics] The success, failure and timeout of the business call execution result will enter the statistics module, and the fuse will be opened or closed according to the health statistics result.

All say that there is no secret in the source code, let’s analyze the core function of the source code, to see how Hystrix to achieve the overall working mechanism.

Third, fusing

There are fuses in household circuits. The role of fuses is that when the circuit fails or is abnormal, the current is constantly rising, and the rising current may damage some important devices or valuable devices in the circuit, and it is possible to burn the circuit and even cause fire.

If the fuse is placed correctly in the circuit, then the fuse will be abnormal in the current rise to a certain height and a certain time, its own fusing to cut off the current, thereby playing the role of protecting the safe operation of the circuit. The fuse provided by Hystrix has a similar function. When an application calls a service provider, the total number of requests exceeds the configured threshold within a certain period of time, and the error rate is too high within the window period, Hystrix will fuse the call request, and the subsequent requests will short circuit directly into the degradation logic, and the local degradation policy will be executed.

Hystrix has the ability of self-regulation. After the fuse is opened for a certain period of time, it will attempt to pass a request and adjust the fuse state according to the execution result. The fuse will automatically switch between closed,open, and half-open states.

Original operation () : Hystrixcircuitbreaker Boolean execution () : Each time HystrixCommand is executed, this method is called to determine whether it can continue execution. If the fuse state is open and the hibernation window has passed, update the fuse state to half-open; The CAS atom changes the fuse state to ensure that only one business request is actually invoked by the provider, and adjusts the state based on the result of execution.

Public Boolean attemptExecution () {/ / whether the configuration forced open the fuse if (the properties. The circuitBreakerForceOpen (). The get () {return false. } / / whether the configuration shutdowns of fuse the if (the properties. The circuitBreakerForceClosed (). The get () {return true; } // determine if (circuit. get() == -1) {return true; } else {// If (isAfterSleepWindow()) {// The update switch is half open, If (status.compareAndSet(status.open, status.half_open)) {return true; } else { return false; }} else {// Refuse request return false; }}}

【HystrixCircuitBreaker】void MarkSuccess () : Invoked after successful HystrixCommand, when the fuse state is half-open, update the fuse state to Closed. In this case, Hystrix automatically adjusts the fuse to Closed when a single request is passed and the service provider is actually called.

Public void markSuccess() {if (status.compareAndSet(status.half_open, status.half_open, status.half_open, status.half_open, status.half_open, status.half_open); Status.closed)) {// Reset the subscription to Health Statistics Metrics. ResetStream (); Subscription previousSubscription = activeSubscription.get(); if (previousSubscription ! = null) { previousSubscription.unsubscribe(); } Subscription newSubscription = subscribeToStream(); activeSubscription.set(newSubscription); // Update the fuse switch to close the circuit. set(-1l); }}

【HystrixCircuitBreaker】void markNonSuccess() : Invoked after successful HystrixCommand. If the fuse state is half-open, update the fuse state to Open. In this case, if the fuse is open, a single request is passed and the service provider is actually called, and the subsequent execution fails, Hystrix continues to leave the fuse open and the request as the start time of the hibernation window.

Public void markNonSuccess() {if (status.compareAndSet(status.half_open, status.open) {// Record the failure time, CircuitOption. set(System.currentTimeMillis()); }}

[Hystrixcircuitbreaker] void Subscribetostream () : If the current request data is greater than a certain value and the error rate is greater than the threshold value, the fuse status is automatically updated to opened. Subsequant requests are short-circuited, and the service provider is no longer actually called, and the downgrade logic is directly entered.

Private Subscription subscribeToStream () {/ / Subscription monitoring statistics return metrics. GetHealthCountsStream (). Observe (). The subscribe (new Subscriber<HealthCounts>() { @Override public void onCompleted() {} @Override public void onError(Throwable e) {} Public void onNext(HealthCounts HC) {// Determinate whether the total number of requests exceeds the configuration threshold. If not, State does not change the fuse if (hc) getTotalRequests () < properties. CircuitBreakerRequestVolumeThreshold (). The get ()) {} else { // determine whether the request error rate exceeds the configuration error rate threshold; if not, the fuse state will not be changed; If more than, the error rate is too high, and the update fuse state is not opened. Refused to subsequent requests the if (hc) getErrorPercentage () < properties. CircuitBreakerErrorThresholdPercentage (). The get ()) {} else {the if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); }}}}}); }

Four, resource isolation

In cargo ships, in order to prevent leakage and the spread of fire, it is common to divide the warehouse, to avoid the tragedy of one warehouse accident leading to the sinking of the whole ship. Similarly, in Hystrix, this bulkhead pattern is used to isolate the service providers in the system. A delay increase or failure of one service provider does not lead to the failure of the whole system, and the concurrency of invoking these services can be controlled. As shown in the figure below, the order service calls the downstream credit service, inventory service and other services using different thread pools. When the credit service fails, the corresponding thread pool will only be filled up without affecting the calls of other services. Hystrix isolation mode supports both thread pool and semaphore methods.

4.1 Semaphore Mode

The semaphore pattern controls the execution concurrency of a single service provider. For example, the number of requests in progress for a single CommondKey is N. If N is less than MaxConcurrentRequests, the execution continues. If it is greater than or equal to MAXConcurrentRequests, it is rejected directly and the regression logic is entered. The semaphore mode is executed using the requesting thread itself, with no thread context switching and low overhead, but the timeout mechanism fails.

Observable<R> ApplyHystrixSemantics (FinalAbstractCommand <R >_CMD) : Attries to obtain semaphore, if it can be obtained, then continue calling the service provider; If it is not available, it enters a downgrade strategy.

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { executionHook.onStart(_cmd); / / fuse is judged by the if (circuitBreaker attemptExecution ()) {/ / get a semaphore final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); }}}; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); }}; / / attempts to acquire a semaphore the if (executionSemaphore tryAcquire ()) {try {/ / record executionResult = business execution start time executionResult.setInvocationStartTime(System.currentTimeMillis()); // Continue with the business Return ExecuteCommandObServe (_cmd).doonError (MarkExceptionThread).doonTerminate (singleMaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); }} else {/ / semaphore refused, into the relegation logic return handleSemaphoreRejectionViaFallback (); }} else {/ / fuse refused, short circuit, directly into the relegation logic return handleShortCircuitViaFallback (); }}

TryAbstractCommand GetExecutionSemaphore () : Obtain the semaphore instance. If the current isolation mode is a semaphore, then obtain the semaphore according to CommandKey, initialize and cache when it does not exist; If the current isolation mode to the thread pool, then use the DEFAULT semaphore TryableSemaphoreNoOp. The DEFAULT, all requests can be passed.

Protected TryableSemaphore getExecutionSemaphore () {/ / whether isolation mode for the semaphore if (the properties. The executionIsolationStrategy (). The get () = = ExecutionIsolationStrategy. SEMAPHORE) {if (executionSemaphoreOverride = = null) {/ / get a SEMAPHORE TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name()); If (_s = = null) {/ / initialize the semaphore and cache executionSemaphorePerCircuit putIfAbsent (commandKey. The name (), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests())); / / return signal return executionSemaphorePerCircuit. Get (commandKey. The name ()); } else { return _s; } } else { return executionSemaphoreOverride; }} else {/ / return the DEFAULT semaphore, any request by return TryableSemaphoreNoOp. The DEFAULT; }}

4.2 Thread pool mode

The thread pool pattern controls the execution concurrency of a single service provider. The code will first fetch the semaphore, just use the default semaphore, all requests can pass through, and then actually invoke the thread pool logic. If N is less than MaximumPoolSize, then a thread is obtained from the Hystrix-managed thread pool and the parameter is passed to the task thread to perform the actual call. If the number of concurrent requests is more than the number of threads in the thread pool, then the parameter is passed to the task thread to perform the actual call. There are tasks that need to be queued, but the queuing queue also has an upper limit. If the queuing queue is full, then the degradation logic is entered. Thread pool mode can support asynchronous call, support timeout call, there is thread switching, overhead.

Observables AbstractCommand 】 【 < R > executeCommandWithSpecifiedIsolation (final AbstractCommand < R > _cmd) : Retrieves a thread from the thread pool and executes it, recording the thread state during the process.

Private observables < R > executeCommandWithSpecifiedIsolation (final AbstractCommand < R > _cmd) {/ / determine whether for thread pool if isolation mode (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (! commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } / / statistics metrics. MarkCommandStart (commandKey, threadPoolKey, ExecutionIsolationStrategy. THREAD). // time out, if time out, If (isCommandTimeOut.get () == TimeOutStatus.TimeOut_out) {return Observable. Error (new RuntimeException(" Timed out before executing run()")); } / / update the thread state for has started the if (threadState.com pareAndSet (ThreadState NOT_USING_THREAD, ThreadState.STARTED)) { HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); / / perform hooks, if abnormal, the exception is thrown directly try {executionHook. OnThreadStart (_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); }} else {// empty return Observable.empty(); }}}).doOnTerminate(new Action0() {@Override public void call() {// }). DoOnSubscribe (new Action0() {@Override public void call() {// Unsubscribe logic, }}. Subscribeon (ThreadPool. GetScheduler (new Func0<Boolean bb0 () {@Override public Boolean call() {}. / / determine whether timeout return properties. ExecutionIsolationThreadInterruptOnTimeout (). The get () && _cmd. IsCommandTimedOut. The get () = = TimedOutStatus.TIMED_OUT; }})); } else {// Semaphore mode // Else}}

【HystrixThreadPool】Subscription schedule(final Action0 action) : HystrixContextScheduler is Hystrix’s rewrite of the Scheduler Scheduler in Rx, which is mainly designed to enable non-execution of commands when an Observable is not subscribed, and to enable interruptions during command execution. In Rx, the Scheduler will generate the corresponding Worker to the Observable for executing commands, and the Worker is specifically responsible for the scheduling of relevant execution threads. ThreadPoolWorker is a Worker self-implemented by Hystrix and the core method of scheduling.

Public Subscription schedule(final Action0 action) {// If no Subscription is available, Does not perform direct return the if (subscription. IsUnsubscribed () {return Subscriptions. Unsubscribed (); } ScheduledAction sa = new ScheduledAction(action); subscription.add(sa); sa.addParent(subscription); ThreadPoolExecutor = (ThreadPoolExecutor) ThreadPoolExecutor (ThreadPoolExecutor) ThreadPool. // Submit the execution task FutureTask<? > f = (FutureTask<? >) executor.submit(sa); sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor)); return sa; }

Five, timeout detection

The Hystrix timeout mechanism reduces the impact of excessive third-party dependency delays on callers, causing requests to fail quickly. Mainly through the delay task mechanism, including registration delay task process and execution delay task process.

When isolation strategy as the thread pool, the main thread to subscribe to perform as a result, the thread pool task thread calls the provider service side, at the same time there will be a timer thread in a certain time after testing tasks to be achieved, if not completed the said task timeout, throw exceptions, overtime and subsequent task threads of execution result will skip no longer release; If completed, it means that the task is completed within the timeout time. The execution is completed, and the timer detects the end of the task.

When the isolation policy is a semaphore, the main thread subscribed to the execution result and actually called the provider server (no task thread). When the specified time has expired, the main thread still completes the business call and then throws a timeout exception. The timeout configuration in semaphore mode has some drawbacks, such as the inability to cancel calls in execution, and the inability to limit the main thread return time.

【AbstractCommand】Observable<R> executeCommandObserve (finalAbstractCommand<R> \_cmd) : Timeout detection entry, the implementation of lift (new HystrixObservableTimeoutOperator < R > (\ _cmd)) connection timeout detection task.

Private Observable< r> ExecuteCommandObserve (Final AbstractCommand< r>_cmd) {private Observable< r> ExecuteCommandObserve (Final AbstractCommand< r>_cmd); / / determine whether open timeout detection the if (the properties. The executionTimeoutEnabled (). The get ()) = {execution executeCommandWithSpecifiedIsolation (_cmd) / / increase the timeout detection operation. Lift (new HystrixObservableTimeoutOperator < R > (_cmd)); } else {/ / normal execution execution = executeCommandWithSpecifiedIsolation (_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }

The Subscriber HystrixObservableTimeoutOperator 】 【 <? super R> call(final Subscriber<? Super R> child) : Create the detection task and associate the delayed task; If the execution of the detection task is not completed, a timeout exception is thrown. If completed or abnormal, the detection task is cleared.

public Subscriber<? super R> call(final Subscriber<? super R> child) { final CompositeSubscription s = new CompositeSubscription(); child.add(s); final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread(); // Override public void tick() {// Override public void tick() {// Override public void tick() { Is updated to timeout if (originalCommand.isCommandTimedOut.com pareAndSet (TimedOutStatus NOT_EXECUTED, TimedOutStatus. TIMED_OUT)) {/ / report failure originalCommand overtime. EventNotifier. MarkEvent (HystrixEventType. A TIMEOUT, originalCommand.commandKey); // Unsubscribe s.unsubscribe(); final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); }}); / / throw a timeout exception timeoutRunnable. The run (); }} / / timeout configuration @ Override public int getIntervalTimeInMilliseconds () {return originalCommand.properties.executionTimeoutInMilliseconds().get(); }}; Final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); originalCommand.timeoutTimer.set(tl); Subscriber<R> parent = new Subscriber<R>() {@Override public void onCompleted() {if (IsNotTimedOut ()) {// Subscriber<R> parent = new Subscriber<R>() {@Override public void onCompleted() {if (IsNotTimedOut ()) { Task completion, clear timeout detection task tl.clear(); child.onCompleted(); }} @Override public void onError(Throwable e) {if (IsNotTimedOut ()) {Tl. Clear ();} @Override public void onError(Throwable e) {if (IsNotTimedOut ()) {Tl. Clear (); child.onError(e); }} @Override public void onNext(R v) {// Override public void onNext(R v) {// Override public void onNext(R v) { If (IsNotTimedOut ()) {Child.onNext (v); }} / / determine whether timeout private Boolean isNotTimedOut () {return originalCommand. IsCommandTimedOut. The get () = = TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); }}; s.add(parent); return parent; }}

[HystrixTimer] Reference<TimerListener>addTimerListener(finalTimerListener) : The AddTimerListener is executed after a delay timeout through Java’s ScheduleAtFixedRate timing task service.

Public Reference<TimerListener> addTimerListener(Final TimerListener Listener) {public Reference<TimerListener> addTimerListener(Final TimerListener Listener) { // Runnable r = new Runnable() {

Public Reference< timerListener > addtimerListener (final timerListener) {// initialize xian startThreadIfNeeded(); Public void run() {try {listener.tick(); public void run() {listener.tick(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); }}}; // ScheduledFuture<? > f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); return new TimerReference(listener, f); }

Six, the drop

The Hystrix degradation logic serves as a back-up strategy and enters the degradation logic when there is a business execution exception, when the thread pool or semaphore is full, when the execution time out, etc. Generic returns should be obtained from memory or static logic in the degraded logic, with minimal dependence on network calls. If the degraded method is not implemented or if an exception also occurs in the degraded method, an exception is thrown in the business thread.

Observables AbstractCommand 】 【 < R > getFallbackOrThrowException (finalAbstractCommand < R > _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, Final Exception OriginalException: If it is an unrecoverable Exception, then return the Exception directly without de-escalation logic. Secondly, judge whether the degraded semaphore can be obtained, and then follow the degraded logic. An exception is returned when an exception also occurs in the degradation logic or when no degradation method is implemented.

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) { final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread(); long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); executionResult = executionResult.addEvent((int) latency, eventType); // determine if the exception is unrecoverable; If (Isunrecoverable (OriginalException)) {logger.error("Unrecoverable error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException); Exception e = wrapWithOnErrorHook(failureType, originalException); Return Observable. Error (new HystrixRuntimeException(failureType, this.getClass(), this.getClass()), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null)); } else {// If (isRecreableError (OriginalException)) {logger.warn(" dissolved from java.lang.Error by serving Hystrix fallback", originalException); } / / whether the demotion configuration to open the if (the properties. The fallbackEnabled (). The get ()) {/ * * * omitted * / final Func1 < Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { Exception e = wrapWithOnErrorHook(failureType, originalException); Exception fe = getExceptionFromThrowable(t); long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); Exception toEmit; // When the getFallback method is not overridden in the business, the operation exception is not supported. Will throw this exception if (fe instanceof UnsupportedOperationException) {logger. The debug (" No fallback for HystrixCommand. ", fe); eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING); toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe); } else {// Logger. Debug ("HystrixCommand Execution "+ FailureType.name () +" and Fallback failed.", fe); eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE); toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe); } if (shouldNotBeWrapped(OriginalException)) {return Observable.error(e); Return Observable. Error (toEmit); }}; Final TryAbleSemaphore FallbackSemaphore = getFallbackSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { fallbackSemaphore.release(); }}}; Observable<R> fallbackExecutionChain; / / try to get the drop semaphore the if (fallbackSemaphore. TryAcquire ()) {try {/ / determine whether defines the fallback method if (isFallbackUserDefined ()) { executionHook.onFallbackStart(this); FallbackExecutionChain = getFallbackObservable(); } else {// Execute fallbackExecutionChain = getFallbackObservable(); } } catch (Throwable ex) { fallbackExecutionChain = Observable.error(ex); } return fallbackExecutionChain .doOnEach(setRequestContext) .lift(new FallbackHookApplication(_cmd)) .lift(new DeprecatedOnFallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease);  } else {/ / downgrade semaphore refused to exception handling return handleFallbackRejectionByEmittingError (); }} else {/ / processing relegation configuration closing abnormal return handleFallbackDisabledByEmittingError (originalException failureType, message); }}}

[HystrixCommand] R getFallback() : The HystrixCommand default throw operation does not support exceptions, requiring subclasses to override the getFalback method to implement the downgrade logic.

protected R getFallback() {
    throw new UnsupportedOperationException("No fallback available.");
}

7. Health statistics

Hystrix is a selective fuse breaker based on the percentage of service failures determined by data statistics from a sliding window, enabling rapid failure and de-escalation logic. Here are the steps:

  • After the execution of AbstractCommand is completed, the HandleCommandEnd method is called to publish the execution result HystrixCommandCompletion event into the event stream.
  • Event flows are grouped by time through the observable.window () method and aggregated into buckets by type (success, failure, etc.) through the flatMap() method.
  • Then, each bucket is aggregated into sliding window data according to the number of buckets in the window using Observable.window();
  • Aggregate sliding window data into data objects (such as health data stream, cumulative data, etc.);
  • Fuse CircuitBreaker initializes to subscribe to the health stream, modifying the fuse’s switches based on health.

【 abstractCommand 】void handleCommandEnd(Boolean CommandExecutionStarted) : After the completion of the business, the handleCommandEnd method is called, in which the executionResult executionResult is reported, which is also the entry point for health statistics.

private void handleCommandEnd(boolean commandExecutionStarted) { Reference<TimerListener> tl = timeoutTimer.get(); if (tl ! = null) { tl.clear(); } long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp; executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency); / / execution results reported health statistics the if (executionResultAtTimeOfCancellation = = null) {metrics. MarkCommandDone (executionResult commandKey, threadPoolKey, commandExecutionStarted); } else { metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted); } if (endCurrentThreadExecutingCommand ! = null) { endCurrentThreadExecutingCommand.call(); }}

BucketedRollingCounterStream BucketedRollingCounterStream 】 【 (HystrixEventStream < Event > stream, final int numBuckets, int bucketSizeInMs,final Func2<Bucket, Event, Bucket> appendRawEventToBucket,final Func2<Output, Bucket, Output> re-duceBucket)

Health statistics class HealthCountsStream sliding window to realize primarily in the parent class BucketedRollingCounterStream, first of all, the superclass BucketedCounterStream will event stream processing into a bucket of flow, BucketedRollingCounterStream processing into sliding window, and then by HealthCountsStream incoming reduceBucket function into health statistics processing.

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs, Final Func2<Bucket, Event, Bucket> appendRawEventToBucket, final Func2<Output, Bucket, Output> reduceBucket) { Data processing into super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket); // Based on the passed reduceBucket function, Func1<Observable<Bucket>, Observable<Output>> ReductionWindowSummary = new Func1<Observable<Bucket>, Observable<Output>>() { @Override public Observable<Output> call(Observable<Bucket> window) { return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); }}; This. sourceStream = bucketedStream // The number of buckets in the window is numBuckets. Move 1 buckets at a time. Window (numBuckets, DoOnSubscribe (new Action0() {Override public void call() {Override public void call() {Override public void call() {Override public void call(); isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() .onBackpressureDrop(); }

【HealthCounts】HealthCounts Plus (Long [] EventTypeCounts) : Accumulates data in bucket by event type to generate statistics HealthCounts;

public HealthCounts plus(long[] eventTypeCounts) { long updatedTotalCount = totalCount; long updatedErrorCount = errorCount; long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()]; long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()]; long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()]; long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()]; long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()]; UpdatedTotalCount += (SuccessCount + FailuCount + TimeoutCount + ThreadPoolRejectedCount + semaphoreRejectedCount); // updateDerrorCount += (failureCount + timeoutCount + ThreadPoolRejectedCount + semaphorerejectedCount); return new HealthCounts(updatedTotalCount, updatedErrorCount); }

Eight, summary

In a distributed environment, it is inevitable that there will be failures among the dependencies of many services. Hystrix is a library that helps users control the interactions between distributed services by adding fuses, isolation, degradation, and other logic to improve the overall resilience of the system. The main functions are as follows:

  • Secure the system and control delays and failures from accessing third-party dependencies, usually over the network
  • Prevents cascading failures in complex distributed systems
  • Fail fast and recover fast
  • Smooth relegation
  • Near-real time monitoring, warning and control

There are a few points to note when using Hystrix:

  • The getFallback() method is overridden, with minimal network dependencies. If there is a network dependency, it is recommended to do multiple de-escalation by instantiating HystrixCommand within getFallback() and executing the Command. GetFallback () tries to ensure a high performance return and a fast degradation.
  • HystrixCommand recommends a strategy of thread isolation.
  • Hystrix. Threadpool. Default. AllowMaximumSizeToDivergeFromCoreSize when set to true, hystrix. Threadpool. Default. MaximumSize will only take effect. The maximum number of threads needs to be considered based on the business situation and performance testing results. Try to set it as small as possible initially and support dynamic resizing, as it is the primary tool for reducing load and preventing resources from getting blocked when delays occur.
  • Under the signal isolation strategy, the parent thread of the application service (such as the Tomcat container thread) is used when the business logic is executed. Therefore, the concurrency quantity must be set well. This strategy is not recommended for calls with network overhead, which may easily lead to queuing jam of the container thread, thus affecting the entire application service.
  • In addition, Hystrix relies heavily on RxJava, a responsive functional programming framework, and a simple understanding of how RxJava is used helps you understand the source logic.

Reference documentation

Hystrix making warehouse: https://github.com/Netflix/Hystrix