preface

This chapter combines Hystrix configuration with the source code of Hystrix’s core functions, including caching, fusing, resource isolation, and degradation.

A, caching,

configuration

  • Requestcache. enabled: Specifies whether the cache function is enabled. The default value is true.

The core source

Caching, the first main process that HystrixCommand enters into toObservable, reads the cache and returns if the cache hits. Otherwise, continue to execute the command. After the command is executed, the result is cached. The Hystrix cache is managed by HystrixRequestCache without going into details, as Hystrix caching is rarely used in current services.

// Cache singleton protected final HystrixRequestCache requestCache; Constructor protected AbstractCommand(...) { this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy); } public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; Final Action0 terminateCommandCleanup =... ; / / final onUnsubscribe processing method Action0 unsubscribeCommandCleanup =... ; Final Func0<Observable<R>> applyHystrixSemantics =... ; / / HystrixCommandExecutionHook onComplete, onEmit encapsulation final Func1 < R, R > wrapWithAllOnNextHooks =... ; // OnCompleted final Action0 fireOnCompletedHook = ... ; Return Observable. Defer (new Func0<Observable<R>>() {@override public Observable<R> call() {// defer the command status if (! commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { throw new HystrixRuntimeException(); } // omit requestLog execution logic... // 1. Based on configuration and cacheKey! Final Boolean requestCacheEnabled = isRequestCachingEnabled(); CacheKey final String cacheKey = getCacheKey(); if (requestCacheEnabled) { // 3. From the cache access HystrixCommandResponseFromCache HystrixCommandResponseFromCache < R > fromCache = (HystrixCommandResponseFromCache < R >)  requestCache.get(cacheKey); if (fromCache ! = null) { isResponseFromCache = true; / / cache tectonic return return handleRequestCacheHitAndEmitValues (fromCache _cmd); Observable Observable<R> hystrixObservable = Observable. Defer (applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; // 4. If (requestCacheEnabled && cacheKey! = null) {/ / encapsulation command and hystrixObservable HystrixCachedObservable < R > toCache = HystrixCachedObservable. The from (hystrixObservable,  _cmd); / / in the cache HystrixCommandResponseFromCache < R > fromCache = (HystrixCommandResponseFromCache < R >) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache ! = null) { toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) .doOnUnsubscribe(unsubscribeCommandCleanup) .doOnCompleted(fireOnCompletedHook); }}); }Copy the code

IsRequestCachingEnabled Determines whether caching is enabled. GetCacheKey is overridden by subclasses, so HystrixCommand that does not implement getCacheKey will not cache.

protected boolean isRequestCachingEnabled() { return properties.requestCacheEnabled().get() && getCacheKey() ! = null; }Copy the code

Second, the fuse

configuration

  • circuitBreaker.enabled: Indicates whether a circuit breaker is allowed. The default value is true.
  • circuitBreaker.forceOpen: Indicates whether to forcibly enable the circuit breaker. The default value is false.
  • circuitBreaker.forceClosed: Indicates whether to forcibly disable the circuit breaker. The default value is false.
  • circuitBreaker.requestVolumeThreshold: sliding window of time (the default metrics. RollingStats. TimeInMilliseconds = 10 s), the total number of requests, after reaching n can statistical failure rate and then perform fusing, 20 by default.
  • circuitBreaker.errorThresholdPercentage: a sliding window of time (the default metrics. RollingStats. TimeInMilliseconds = 10 s), the error rate after reaching n %, open circuit breaker, 50 by default.
  • circuitBreaker.sleepWindowInMilliseconds: A request is rejected within n milliseconds after the circuit breaker is turned on. After this time, you can try to initiate a request again.

HystrixCircuitBreaker

HystrixCircuitBreaker is an abstract interface for Hystrix circuit breakers. There are three abstract methods that need to be implemented:

  • Boolean allowRequest() : This method is called each time HystrixCommand is executed to determine whether execution can continue.
  • Boolean isOpen() : whether the circuit breaker isOpen, allowRequest and isOpen may be different implementations.
  • Void markSuccess() : HystrixCommand is invoked after the command is successfully executed to update the circuit breaker status.

HystrixCircuitBreakerImpl

The default implementation is HystrixCircuitBreakerImpl HystrixCircuitBreaker.

  • Member variables and constructors:
Static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {/ / private final configuration file HystrixCommandProperties  properties; HystrixCommandMetrics Manage HystrixCommand metrics. Private Final HystrixCommandMetrics; Private AtomicBoolean circuitOpen = new AtomicBoolean(false); / / circuit breaker opening time/ajar for the first time Last time try to time private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong (); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; }}Copy the code
  • IsOpen: Determines whether a circuit breaker is enabled based on the identifier and statistics
@override public Boolean isOpen() {if (circuitOpen.get()) {return true; HealthCounts HealthCounts health = metrics.gethealthcounts (); / / if the request is the total number of < circuitBreaker requestVolumeThreshold, Return circuit breaker closing the if (health. GetTotalRequests () < properties. CircuitBreakerRequestVolumeThreshold (). The get () {return false. } / / if the error rate < circuitBreaker errorThresholdPercentage, Return circuit breaker closing the if (health. GetErrorPercentage () < properties. CircuitBreakerErrorThresholdPercentage (). The get () {return false. } else {/ / circuit breaker opens the if (circuitOpen.com pareAndSet (false, true)) {/ / if the cas is successful, Set the circuit breaker for the first time open the timestamp circuitOpenedOrLastTestedTime. Set (System. CurrentTimeMillis ()); return true; } else { return true; }}}Copy the code
  • AllowSingleTest: The circuit breaker is half-open to determine whether a request can be attempted
Public Boolean allowSingleTest () {/ / for circuit breaker open the timestamp of the last long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); // If the circuit breaker is on, And the current time - circuit breaker open last time > configuration circuitBreaker. SleepWindowInMilliseconds if (circuitOpen. The get () && System. CurrentTimeMillis () > TimeCircuitOpenedOrWasLastTested + properties. CircuitBreakerSleepWindowInMilliseconds (). The get ()) {/ / cas modify circuit breaker opening time if last time (circuitOpenedOrLastTestedTime.com pareAndSet (timeCircuitOpenedOrWasLastTested, System. CurrentTimeMillis ())) {/ / modify after the success, Return allows an attempt to execute a request return true; }} return false; }Copy the code
  • AllowRequest: Whether HystrixCommand is allowed
@override public Boolean allowRequest() {circuitBreaker. ForceOpen =true, Return cannot perform the if (properties. CircuitBreakerForceOpen (). The get () {return false. } / / if the circuitBreaker forceClosed = true, return can perform the if (the properties. The circuitBreakerForceClosed (). The get ()) {/ / call isOpen, according to statistics, Update the circuit breaker on/off state isOpen(); return true; } // In the case of no special configuration // the circuit breaker is off or half-open, allowing an attempt to request a return! isOpen() || allowSingleTest(); }Copy the code
  • MarkSuccess: Circuit breaker id updated, HealthCountsStream reset
public void markSuccess() { if (circuitOpen.get()) { if (circuitOpen.compareAndSet(true, False)) {// HealthCountsStream resets metrics.resetStream(); }}}Copy the code

HealthCounts

HealthCounts maintains statistics about request counts in time Windows. HealthCounts in different time Windows are managed by HealthCountsStream.

Public static class HealthCounts {private final Long totalCount; Private final long errorCount; Private final int errorPercentage; private final int errorPercentage; }Copy the code

The plus method, used to update statistics ina time window, takes an array of events, subscript is an enumeration. ordinal, and the element is the number of times the event occurred.

public HealthCounts plus(long[] eventTypeCounts) { long updatedTotalCount = totalCount; long updatedErrorCount = errorCount; long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()]; long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()]; long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()]; long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()]; long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()]; UpdatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); // Number of errors = number of events - number of successes updatedErrorCount += (failurelyor series of events + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);  return new HealthCounts(updatedTotalCount, updatedErrorCount); }Copy the code

The core source

The AbstractCommand constructor initializes the circuit breaker

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {/ / initialization circuit breaker enclosing circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); } private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { if (enabled) { // circuitBreaker.enabled = true if (fromConstructor == null) { // Create HystrixCircuitBreakerImpl / / return based on commandKey dimension HystrixCircuitBreaker. Factory. GetInstance (commandKey, groupKey. properties, metrics); } else { return fromConstructor; }} else {// NoOpCircuitBreaker // allowRequest returns true // isOpen returns false return NoOpCircuitBreaker(); }}Copy the code

The first layer of toObservable is cache processing, the second layer is circuit breaker judgment, and the entry method is applyHystrixSemantics

Private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { AllowRequest mentioned above the if (circuitBreaker allowRequest ()) {/ /... } else {/ / or custom exception, perform downgrade logic return handleShortCircuitViaFallback (); }}Copy the code

HandleShortCircuitViaFallback packaging circuit is unusual, perform downgrade logic

Private observables < R > handleShortCircuitViaFallback () {/ / HystrixEventNotifier default empty implementation eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey); // Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN"); / / set to perform anomaly executionResult = executionResult setExecutionException (shortCircuitException); Try {/ / demote logical subsequent read return getFallbackOrThrowException (this, HystrixEventType. SHORT_CIRCUITED, FailureType SHORTCIRCUIT, "short-circuited", shortCircuitException); } catch (Exception e) { return Observable.error(e); }}Copy the code

Third, resource isolation

configuration

Common configuration

  • Execution. The isolation. The strategy: isolation strategy, optional Thread, Semaphore, the default Thread. The official recommendation is to use Thread when there are network calls; Semaphore Semaphore is used when pure memory operations or very high concurrency results in a very high cost of creating thread resources.
  • Execution. A timeout. Enabled:HystrixCommand.run()Whether there is a timeout control. The default is true.
  • Execution. The isolation. Thread. TimeoutInMilliseconds: timeout, default 1 s. Don’t be misled by the configuration names, either semaphore isolation or thread isolation policies can currently be set to a timeout using this property.

Thread pool configuration

  • CoreSize: number of core threads. Default: 10.
  • AllowMaximumSizeToDivergeFromCoreSize: whether to support the maximum number of threads configured, the default false, the core number of threads is equal to the maximum number of threads.
  • MaximumSize: indicates the maximum number of threads. Default: 10.
  • MaxQueueSize: Specifies the length of the thread pool wait queue. The default value is -1 and the queue implementation is SynchronousQueue. Set to a number greater than 0, and the queue is implemented as LinkedBlockingQueue. Dynamic updating of this configuration is not supported and reinitializing the thread pool is required to update the configuration.
  • QueueSizeRejectionThreshold: when maxQueueSize > 0, because maxQueueSize does not support dynamic update, the purpose of this configuration is updated dynamically, to reject the request, 5 by default.
  • KeepAliveTimeMinutes: non-core thread idle time, default 1min.

Semaphore configuration

  • Execution. The isolation. Semaphore. MaxConcurrentRequests: HystrixCommand. Run at the same time the maximum amount of execution, the default 10.

The core source

ToObservable’s third layer of logic is resource isolation.

Semaphore isolation

The semaphore isolation policy is handled first, with an entry of AbstractCommand#applyHystrixSemantics.

Private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) (circuitBreaker allowRequest ()) {/ / get a semaphore final TryableSemaphore executionSemaphore = getExecutionSemaphore (); SemaphoreHasBeenReleased = new AtomicBoolean(false); semaphoreHasBeenReleased = new AtomicBoolean(false); Final Action0 singleSemaphoreRelease = new Action0() {@override public void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); }}}; Final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {@override public void call(Throwable)  t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); }}; / / attempts to acquire a semaphore the if (executionSemaphore tryAcquire ()) {try {/ / tag executionResult = start processing time executionResult.setInvocationStartTime(System.currentTimeMillis()); Return executeCommandAndObserve(_cmd).doonError (markExceptionThrown).doonTerminate (singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); Failed to get a semaphore}} else {/ / Down-cycled return handleSemaphoreRejectionViaFallback (); }} else {/ / open circuit breaker down-cycled return handleShortCircuitViaFallback (); }}Copy the code

The getExecutionSemaphore method gets the semaphore.

/ / static member variables executionSemaphorePerCircuit / / save commandKey - semaphore mapping relationship protected static final ConcurrentHashMap < String, TryableSemaphore> executionSemaphorePerCircuit = new ConcurrentHashMap<String, TryableSemaphore>(); protected TryableSemaphore getExecutionSemaphore() { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy. SEMAPHORE) {/ / if the strategy is to signal if (executionSemaphoreOverride = = null) {/ / First from a static member variables executionSemaphorePerCircuit semaphore TryableSemaphore _s = executionSemaphorePerCircuit. Get (commandKey. The name ()); If (_s == null) {// Create Hystrix's own semaphore implementation class TryableSemaphoreActual executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests())); return executionSemaphorePerCircuit.get(commandKey.name()); } else { return _s; } } else { return executionSemaphoreOverride; }} else {// If the policy is thread isolation, Return TryableSemaphoreNoOp. DEFAULT / / TryableSemaphoreNoOp. DEFAULT tryAcquire method always returns true return TryableSemaphoreNoOp. The DEFAULT;  }}Copy the code

TryableSemaphoreActual is a simple semaphore implementation of AQS that does not rely on JUC.

static class TryableSemaphoreActual implements TryableSemaphore { // Execution. The isolation. The semaphore. MaxConcurrentRequests dynamic configuration protected final HystrixProperty < Integer > numberOfPermits; Private final AtomicInteger Count = new AtomicInteger(0); public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) { this.numberOfPermits = numberOfPermits; } @Override public boolean tryAcquire() { int currentCount = count.incrementAndGet(); if (currentCount > numberOfPermits.get()) { count.decrementAndGet(); return false; } else { return true; } } @Override public void release() { count.decrementAndGet(); } @Override public int getNumberOfPermitsUsed() { return count.get(); }}Copy the code

Thread isolation

The AbstractCommand constructor initializes the thread pool

protected final HystrixThreadPool threadPool; protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, Get HystrixThreadPoolKey HystrixCommandExecutionHook executionHook) {/ /, // HystrixCommandGroupKey this. ThreadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get()); HystrixThreadPool = HystrixThreadPoolDefault this. ThreadPool = HystrixThreadPoolDefault initThreadPool(threadPool, this.threadPoolKey,threadPoolPropertiesDefaults); } private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) { if (fromConstructor == null) { return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults); } else { return fromConstructor; }}Copy the code

Executecommando Bserve initiates a dispatch

AbstractCommand#executeCommandAndObserve thread isolation code entry

Private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {HystrixRequestContext final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); // onNext final Action1<R> markEmits = ... ; // onCompleted final Action0 markOnCompleted = new Action0() { @Override public void call() { if (! commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); ExecutionResult = executionResult.adDevent ((int) latency, hystrixeventType.success); / / circuit breaker tag circuitBreaker success. MarkSuccess (); }}}; Final Func1<Throwable, Observable<R>> handleFallback =... ; Final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); }}; Observable<R> execution; If (properties. ExecutionTimeoutEnabled (). The get ()) {/ / if execution. A timeout. Enabled execution = = true / / perform follow-up logic ExecuteCommandWithSpecifiedIsolation (_cmd) / / HystrixObservableTimeoutOperator timeout logic, Subsequent interpretation. Lift (new HystrixObservableTimeoutOperator < R > (_cmd)); } else {/ / perform subsequent logic execution = executeCommandWithSpecifiedIsolation (_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }Copy the code

AbstractCommand# executeCommandWithSpecifiedIsolation according to different isolation strategy, perform different logic. Semaphore isolation is simply the execution of user code.

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { return Observable.defer(new Func0<Observable<R>>() {@override public Observable<R> call() {// cas changes the status of an Observable if (! commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } // If (iscommandTimedout.get () == timedOutstatus.timed_out) {return Observable. Error (new RuntimeException("timed  out before executing run()")); } / / cas modified thread state if (threadState.com pareAndSet (ThreadState NOT_USING_THREAD, ThreadState. STARTED)) {/ / execution HystrixCommand. Run method return getUserExecutionObservable (_cmd); } } }).doOnTerminate(new Action0() { @Override public void call() { // onTerminate... } }).doOnUnsubscribe(new Action0() { @Override public void call() { // onUnsubscribe... }}). Threadpool.getscheduler (new Func0<Boolean>() { Whether the interrupt execution thread @ Override public Boolean call () {return properties. ExecutionIsolationThreadInterruptOnTimeout (). The get () && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; }})); Return Observable.defer(new Func0<Observable<R>>() {@override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); // cas updates the command execution status if (! commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } / / execution HystrixCommand. Run method return getUserExecutionObservable (_cmd); }}); }}Copy the code

HystrixThreadPoolDefault getScheduler gain Scheduler. Note that the HystrixThreadPoolDefault instance is already HystrixThreadPool for threadPoolKey.

static class HystrixThreadPoolDefault implements HystrixThreadPool { private final HystrixThreadPoolProperties properties; private final ThreadPoolExecutor threadPool; @Override public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) { // 1. Update the configuration of ThreadPoolExecutor based on dynamic configuration touchConfig(); 2. Create HystrixContextScheduler return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } // According to the dynamic configuration, Private void touchConfig() {final int dynamicCoreSize = properties.coreSize().get(); Final int configuredMaximumSize = properties.maximumSize().get(); / / according to allowMaximumSizeToDivergeFromCoreSize and coreSize and maximumSize jointly determine the maximum number of threads / / actual int dynamicMaximumSize = properties.actualMaximumSize(); final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get(); // Maximum number of threads Boolean maxTooLow = false; If (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {dynamicMaximumSize = dynamicCoreSize; maxTooLow = true; } if (threadPool.getCorePoolSize() ! = dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() ! = dynamicMaximumSize)) { if (maxTooLow) { logger.error(...) ; } / / set ThreadPoolExecutor threadPool. SetCorePoolSize (dynamicCoreSize); threadPool.setMaximumPoolSize(dynamicMaximumSize); } / / set ThreadPoolExecutor threadPool. SetKeepAliveTime (properties. KeepAliveTimeMinutes (). The get (), TimeUnit. MINUTES); }}Copy the code

HystrixObservableTimeoutOperator timeout detection

HystrixObservableTimeoutOperator. Call timeout detection logic, by delay timing task + CAS timeout detection.

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> { final AbstractCommand<R> originalCommand; public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) { this.originalCommand = originalCommand; } @Override public Subscriber<? super R> call(final Subscriber<? Super R> child) {// omit irrelevant code... HystrixContextRunnable HystrixRequestContext Final HystrixContextRunnable timeoutRunnable = new HystrixRequestContext final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, New Runnable() {@override public void run() {// Raise onError child.onError(new HystrixTimeoutException()); }}); / / TimerListener subsequent to the ScheduledThreadPoolExecutor timing will be submitted to perform TimerListener listener = new TimerListener () {@ Override public Void tick () {/ / to judge whether can set the timeout if by CAS (originalCommand.isCommandTimedOut.com pareAndSet (TimedOutStatus NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { s.unsubscribe(); TimeoutRunnable (); // Run HystrixTimeoutException timeOutrunable.run (); } } // execution.isolation.thread.timeoutInMilliseconds @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); }}; / / submit TimerListener to ScheduledThreadPoolExecutor regularly perform final Reference < TimerListener > tl = HystrixTimer.getInstance().addTimerListener(listener); // omit irrelevant code... }}Copy the code

HystrixTimer#addTimerListener submits a timed timeout detection task. Note that Hystrix globally opens a thread pool for timeout detection. By default, the number of core threads is 8 and the maximum number of threads is integer.max_value.

public class HystrixTimer { public Reference<TimerListener> addTimerListener(final TimerListener listener) { // Initialize the thread pool ScheduledExecutor. The initialize startThreadIfNeeded (); Runnable r = new Runnable() {@override public void run() {try {// Execute the tick listener.tick(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); }}}; / / delay interval and execution. The isolation. Thread. TimeoutInMilliseconds time execution, the default 1 seconds ScheduledFuture <? > f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); return new TimerReference(listener, f); } static class ScheduledExecutor { volatile ScheduledThreadPoolExecutor executor; private volatile boolean initialized; / / initialize the thread pool public void the initialize () {HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy(); / / hystrix. Timer. Threadpool. Default. CoreSize default size is 8 int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get(); // Thread factory, HystrixTimer-* ThreadFactory ThreadFactory = new ThreadFactory() {final AtomicInteger counter = new AtomicInteger();  @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet()); thread.setDaemon(true); return thread; }}; // super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); / / maxPoolSize infinity, non-core executor thread survival time 0 = new ScheduledThreadPoolExecutor (coreSize threadFactory); initialized = true; } public ScheduledThreadPoolExecutor getThreadPool() { return executor; } public boolean isInitialized() { return initialized; }}}Copy the code

HystrixContextScheduler execute Command

Rx.Scheduler and Rx.Scheduler.Worker are both abstract classes that Scheduler creates. The Worker is responsible for actually scheduling the Action.

Create HystrixContextSchedulerWorker HystrixContextScheduler# createWorker ThreadPoolScheduler operation.

public class HystrixContextScheduler extends Scheduler { private final HystrixConcurrencyStrategy concurrencyStrategy; private final Scheduler actualScheduler; private final HystrixThreadPool threadPool; public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) { this.concurrencyStrategy = concurrencyStrategy; this.threadPool = threadPool; this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread); } @Override public Worker createWorker() { return new HystrixContextSchedulerWorker(actualScheduler.createWorker()); }}Copy the code

HystrixContextScheduler. HystrixContextSchedulerWorker# schedule (rx) functions provides) Action0) scheduling Action, Actual operation WorkerHystrixContextScheduler ThreadPoolWorker.

private class HystrixContextSchedulerWorker extends Worker { private final Worker worker; private HystrixContextSchedulerWorker(Worker actualWorker) { this.worker = actualWorker; } @Override public Subscription schedule(Action0 action) { if (threadPool ! = null) {// Determine whether the dynamic queue length is sufficient if (! threadPool.isQueueSpaceAvailable()) { throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); }} / / HystrixContextScheduler ThreadPoolWorker schedule method actual return worker. The schedule (new HystrixContexSchedulerAction(concurrencyStrategy, action)); }}Copy the code

HystrixThreadPool. HystrixThreadPoolDefault# isQueueSpaceAvailable judge whether dynamic queue length is enough

@override public Boolean isQueueSpaceAvailable() {if (queueSize <= 0) {return true; } else {/ / ThreadPoolExecutor queue length less than queueSizeRejectionThreshold configuration of dynamic queue length limit return threadPool. GetQueue (). The size () < properties.queueSizeRejectionThreshold().get(); }}Copy the code

ThreadPoolWorker#schedule is where the task is actually submitted to the thread pool for execution.

private static class ThreadPoolWorker extends Worker { private final HystrixThreadPool threadPool; private final CompositeSubscription subscription = new CompositeSubscription(); private final Func0<Boolean> shouldInterruptThread; public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) { this.threadPool = threadPool; this.shouldInterruptThread = shouldInterruptThread; } @Override public Subscription schedule(final Action0 action) { if (subscription.isUnsubscribed()) { return Subscriptions.unsubscribed(); } ScheduledAction sa = new ScheduledAction(action); subscription.add(sa); sa.addParent(subscription); ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(); / / submit tasks to the thread pool, throws RejectedExecutionException FutureTask <? > f = (FutureTask<? >) executor.submit(sa); sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor)); return sa; }}Copy the code

Four, the drop

The related configuration

  • Fallback. enabled: indicates whether to allow degradation. The default value is true.
  • Fallback. Isolation. Semaphore. MaxConcurrentRequests: downgrade semaphore, 10 by default.

Source of case

AbstractCommand# getFallbackOrThrowException method is downgrade logic entrance, the Command execution in the process of abnormal basic will go this way, judge whether the abnormal need go down method, and the concrete logical relegation.

  • Breaker reject requestAbstractCommand#applyHystrixSemantics
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { if (circuitBreaker.allowRequest()) { // ...  Ignore} else {/ / fusing fallback return handleShortCircuitViaFallback (); } } private Observable<R> handleShortCircuitViaFallback() { eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey); Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN"); / / set to perform anomaly executionResult = executionResult setExecutionException (shortCircuitException); Try {/ / execution getFallbackOrThrowException return getFallbackOrThrowException (this, HystrixEventType SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited", shortCircuitException); } catch (Exception e) { return Observable.error(e); }}Copy the code
  • Failed to get semaphoreAbstractCommand#applyHystrixSemantics
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { if (circuitBreaker.allowRequest()) { // Final TryableSemaphore executionSemaphore = getExecutionSemaphore(); if (executionSemaphore.tryAcquire()) { // ... Failed to get a semaphore ignore} else {/ /, perform fallback logic return handleSemaphoreRejectionViaFallback (); } } else { return handleShortCircuitViaFallback(); }} private observables < R > handleSemaphoreRejectionViaFallback () {/ / original Exception Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution"); / / set to perform anomaly executionResult = executionResult setExecutionException (semaphoreRejectionException); / / call getFallbackOrThrowException return getFallbackOrThrowException (this, HystrixEventType SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION, "could not acquire a semaphore for execution", semaphoreRejectionException); }Copy the code

Which exceptions do not follow the downgrade logic

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) { if (shouldNotBeWrapped(originalException)){ // ExceptionNotWrappedByHystrix tag exception won't go fallback return observables. The error (e); } else if (isUnrecoverable(originalException)) {// Some Unrecoverable errors will not take fallback Logger. Error ("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException); return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null)); } else { // ... }}Copy the code

The shouldNotBeWrapped method can be overwritten by subclasses to define which exceptions do not perform degradation logic. AbstractCommand default implementation for the realization of marker interface ExceptionNotWrappedByHystrix exception class will not go down logic.

protected boolean shouldNotBeWrapped(Throwable underlying) {
    return underlying instanceof ExceptionNotWrappedByHystrix;
}
Copy the code
/ / a marker interface public interface ExceptionNotWrappedByHystrix {}Copy the code

IsUnrecoverable Determines whether the cause of an exception is irrecoverable. Such as stack overflow, virtual machine exceptions, thread termination.

private boolean isUnrecoverable(Throwable t) { if (t ! = null && t.getCause() ! = null) { Throwable cause = t.getCause(); if (cause instanceof StackOverflowError) { return true; } else if (cause instanceof VirtualMachineError) { return true; } else if (cause instanceof ThreadDeath) { return true; } else if (cause instanceof LinkageError) { return true; } } return false; }Copy the code

Demotion logic

Read the JavaDoc getFallbackOrThrowException method, which has guiding effect to downgrade we use.

Execute getFallback() within protection of a semaphore that limits number of concurrent executions.

Fallback implementations shouldn’t perform anything that can be blocking, but we protect against it anyways in case someone doesn’t abide by the contract. If something in the getFallback() implementation is latent (such as a network call) then the semaphore will cause us to start rejecting requests rather than allowing potentially all threads to pile up and block.

Execution of the getFallback method is semaphore protected. Fallback implementations should not block, and the hystrix authors put semaphore protection on this method in case developers do not comply with the specification. If fallback’s implementation has something like a network call, using a semaphore will reject the request instead of letting threads pile up and block.

GetFallbackOrThrowException relegation logic

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) { // ... Omit irrelevant logic / / fallback. Enabled = true if (the properties. The fallbackEnabled () get ()) {/ / set the context final Action1 < Notification <? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(requestContext); }}; // onNext final Action1<R> markFallbackEmit = ... ; // OnCompleted final Action0 markFallbackCompleted = ... ; // Handle the exception thrown by the degraded method, Final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { Exception e = originalException; Exception fe = getExceptionFromThrowable(t); If (fe instanceof UnsupportedOperationException) {/ / will throw an UnsupportedOperationException return from subclasses can not find a fallback Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe)); Error (new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe)); }}}; / / for fallback method have a semaphore control (fallback. Isolation. Semaphore. MaxConcurrentRequests) / / the default size of 10 final TryableSemaphore fallbackSemaphore = getFallbackSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); Final Action0 singleSemaphoreRelease = new Action0() {@override public void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) { fallbackSemaphore.release(); }}}; Observable Observable<R> fallbackExecutionChain; / / get demoted semaphore the if (fallbackSemaphore tryAcquire ()) {try {fallbackExecutionChain = getFallbackObservable (); } catch (Throwable ex) { fallbackExecutionChain = Observable.error(ex); } return fallbackExecutionChain .doOnEach(setRequestContext) .lift(new FallbackHookApplication(_cmd)) .lift(new DeprecatedOnFallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease);  } else {// Failed to process degraded semaphores, encapsulated as Observable.error(new HystrixRuntimeException(...)) ); return handleFallbackRejectionByEmittingError(); }} else {// Handle disable degradation as Observable. Error (new HystrixRuntimeException(...)) ); return handleFallbackDisabledByEmittingError(originalException, failureType, message); }}Copy the code

AbstractCommand#getFallbackObservable gets the degradation logic.

@Override final protected Observable<R> getFallbackObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { return Observable.just(getFallback()); } catch (Throwable ex) { return Observable.error(ex); }}}); }Copy the code

HystrixCommand# getFallback throw UnsupportedOperationException by default. The caller needs to rewrite the degradation logic himself.

protected R getFallback() {
    throw new UnsupportedOperationException("No fallback available.");
}
Copy the code

conclusion

In this paper, several functional modules of Hystrix core are analyzed based on the common configurations of Hystrix

  • Cache: not very often, so get a feel for it.
  • Fuse: controlled by the CommandKey dimension HystrixCircuitBreaker when sliding Windows (default)metrics.rollingStats.timeInMilliseconds=10s) The number of requests reachescircuitBreaker.requestVolumeThreshold(Default value: 20) The statistics error rate exceeds the thresholdcircuitBreaker.errorThresholdPercentage(default 50%) after the circuit breaker occurs. aftercircuitBreaker.sleepWindowInMilliseconds(5 seconds by default), you can try to make the request again (the breaker is half-open). If this request is successful, the breaker is closed, otherwise it passes againcircuitBreaker.sleepWindowInMillisecondsDuration, you can try to initiate the request again.
  • Resource isolation: HystrixThreadPoolKey/HystrixCommandGroupKey dimension control. Thread isolation is used by default. Support timeout control, defaultexecution.timeout.enabled=true, the timeout period is 1 secondexecution.isolation.thread.timeoutInMilliseconds=1000. adviceexecution.isolation.thread.timeoutInMilliseconds > ribbon.ReadTimeout+ribbon.ConnectTimeout.
  • Degrade: Degrade is enabled by defaultfallback.enabled=trueHystrixCommand’s getFallback method needs to be overridden. By defaultExceptionNotWrappedByHystrixFlagged exceptions do not enter degradation logic. Some unrecoverable exceptions, such as OOM and SOF, do not enter the degradation logic. Do not make network calls or other blocking operations in the user’s demotion methods, although demotion also has semaphoresfallback.isolation.semaphore.maxConcurrentRequests=10Control.
  • Hystrix dynamic configuration: Hystrix dynamic configuration byHystrixDynamicPropertiesArchaiusImplementation, the underlying use of spring and Netflix archaius provided by spring-cloud-starter-Netflix-archaius integration. It was mentioned earlier in Feign’s source code readingJuejin. Cn/post / 687856…. Note the thread pool configuration items for resource isolationmaxQueueSizeDynamic configuration is not supportedqueueSizeRejectionThresholdConfiguration item implementation.
private final String configKey = "hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds";
@Test
public void testDynamicProperty() throws InterruptedException {
    DynamicProperty instance = DynamicProperty.getInstance(configKey);
    // hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds:1000
    System.out.println(configKey + ":" + instance.getString());
}
Copy the code