1. Basic process

  • Create commands, two Command types
  • Execute Command in four ways
  • Check whether request cache is enabled and whether request cache exists. If yes, read the request cache directly

2. Basic components

2.1 the Request Cache

First, there is the concept of a request context. Each request will be in the filter, and each request will be given a request context. In the context of a request, if there are multiple commands, their parameters will be the same, and the interface will be the same. In fact, it can be thought of the same way, in that case, we can have the result of the first Command executed, cached in memory, and then in the context of the request, all subsequent calls to that dependency are cached out of memory.

HystrixObservableCommand and HystrixObservableCommand can both specify a cache key, and hystrix will cache it automatically, and then when the same request context is accessed, the cache will be fetched directly. Avoid repeating network requests.

For Request Cache, Request consolidation, Request Log, etc., you must manage the declaration cycle of HystrixRequestContext yourself

2.2 Fallback degradation mechanism

Hystrix calls various interfaces, or accesses external dependencies, and an exception occurs

Each external dependency can only be accessed with a certain amount of resources, reject, thread pool/semaphore, and if the resource pool is full

Accessing an external dependency exposes a TimeOutException if a timeout occurs

When the proportion of abnormal events found by the short-circuit device reaches a certain proportion, the short-circuit is directly started

In all cases, fallback is implemented. For example, we can use the local cache as a memory cache based on lRU automatic cleaning. If this happens, we can try to fetch data from the cache, or return a default value

@Override
protected User getFallback(a) { // fallback
   return new User("fallBack"."back"."back");
}
Copy the code

2.3 short circuiter

  • If after a short circuit flow rate reached a certain threshold, HystrixCommandProperties. CircuitBreakerRequestVolumeThreshold ();
  • If the breaker statistics to abnormal call ratio reached a certain threshold, the HystrixCommandProperties. CircuitBreakerErrorThresholdPercentage ();
  • The short-circuiter switches from close to Open

When the short-circuiter is turned on, all the requests that pass through the short-circuiter are short-circuited, and the fallback mechanism is directly used instead of calling the back-end service

After a period of time, * * HystrixCommandProperties circuitBreakerSleepWindowInMilliseconds (), * * will be half – open, make a request through the short, can see the normal call, If the call succeeds, it automatically resumes and goes to the close state

3. Feign and Hystrix integration parameters

hystrix:
  command:
    default:
      execution:
        isolation:
          strategy: THREAD
          thread:
            timeoutInMilliseconds: 1000
            interruptOnTimeout: true
        semaphore:
          maxConcurrentRequests: 10
        timeout: 
          enabled: true
      circuitBreaker:
        enabled: true
        requestVolumeThreshold: 20
        sleepWindowInMilliseconds: 5000
        errorThresholdPercentage: 50
Copy the code

3.1 Thread pool-related properties

Hystrix. Threadpool. Default. CoreSize: the thread pool size, 10 by default

Hystrix. Threadpool. Default. MaximumSize: thread pool maximum size, 10 by default

Hystrix. Threadpool. Default. AllowMaximumSizeToDivergeFromCoreSize: whether to allow dynamic adjust the number of threads, the default false, only the set to true, the maximumSize above is valid

Hystrix. Threadpool. Default. KeepAliveTimeMinutes: the default is 1, beyond the thread of coreSize, free release after 1 minute

Hystrix. Threadpool. Default. MaxQueueSize default – 1, cannot be modified dynamically

Hystrix. Threadpool. Default. QueueSizeRejectionThreshold can dynamically modify, default is five, enter the request queue, and then carried out by the thread pool

3.2 Executing related attributes

Hystrix.com mand. Default. Execution. The isolation. The strategy: isolation strategy, the default Thread, can choose a Semaphore Semaphore

Hystrix.com mand. Default. Execution. The isolation. Thread. TimeoutInMilliseconds: timeout, the default 1000 ms

Hystrix.com mand. Default. Execution. A timeout. Enabled: whether to enable the timeout, the default is true

Hystrix.com mand. Default. Execution. The isolation. Thread. InterruptOnTimeout: whether the timeout time interrupt execution, the default is true

Hystrix.com mand. Default. Execution. The isolation. Semaphore. MaxConcurrentRequests: signal isolation strategy, allow maximum number of concurrent requests, 10 by default

3.3 Attributes related to degradation

Hystrix.com mand. Default. Fallback. Enabled by default is true

3.4 Fuse-related attributes

Hystrix.com mand. Default. CircuitBreaker. Enabled: whether to enable fuse true by default

Hystrix.com mand. Default. CircuitBreaker. RequestVolumeThreshold: 10 seconds, what is the number of requests to try to trigger fusing, 20 by default

Hystrix.com mand. Default. CircuitBreaker. ErrorThresholdPercentage: 10 seconds, the number of requests up to 20, abnormal ratio of 50% at the same time, will trigger the fusing, 50 by default

Hystrix.com mand. Default. CircuitBreaker. SleepWindowInMilliseconds: trigger after fusing, directly to refuse the request within 5 s, go down logic, 5 s after trying to lose half – open a small amount of traffic trying to restore, the default of 5000

Hystrix.com mand. Default. CircuitBreaker. ForceOpen: forced open the fuse

Hystrix.com mand. Default. CircuitBreaker. ForceClosed: forced to close the fuse

3.5 Metric attributes

Hystrix. Threadpool. Default. The metrics. RollingStats. TimeInMillisecond: thread pool statistical indicators of the time, the default, 10000 is 10 s

Hystrix. Threadpool. Default. The metrics. RollingStats. NumBuckets: the rolling window is divided into n buckets, 10 by default

Hystrix.com mand. Default. The metrics. RollingStats. TimeInMilliseconds: the command of the statistical time, whether the fuse will open, according to a rolling window statistics to calculate. If Rolling Window is set to 10000 ms, rolling Window will be divided into N buckets, and each bucket contains the statistics of The Times of success, failure, timeout and rejection. The default is 10000

Hystrix.com mand. Default. The metrics. RollingStats. NumBuckets set up a rolling window is divided into the number of, if numBuckets = 10, rolling window = 10000, The time of a bucket is then 1 second. Must match rolling Window % numberBuckets == 0. The default 10

Hystrix.com mand. Default. The metrics. RollingPercentile. Enabled if executes the enable index calculation and tracking, the default is true

Hystrix.com mand. Default. The metrics. RollingPercentile. TimeInMilliseconds set rolling the percentile window of time, the default of 60000

Hystrix.com mand. Default. The metrics. RollingPercentile. NumBuckets set rolling the percentile numberBuckets window. Same logic. The default 6

Hystrix.com mand. Default. The metrics. RollingPercentile. BucketSize if the bucket size = 100, window = 10 s, if are there 500 times in the 10 s, Only the last 100 executions are counted in the bucket. Increasing this value increases memory overhead as well as sorting overhead. The default is 100

Hystrix.com mand. Default. The metrics. HealthSnapshot. IntervalInMilliseconds record health snapshot (for success and error statistics green) intervals, the default 500 ms

3.6 Advanced Features

Hystrix.com mand. Default. RequestCache. Enabled is true

Hystrix.com mand. Default. RequestLog. Enabled logging to HystrixRequestLog, true by default

Hystrix. Collapser. Default. MaxRequestsInBatch single batch the maximum number of requests, to reach the trigger the batch number, the default Integer. MAX_VALUE

Hystrix. Collapser. Default. TimerDelayInMilliseconds trigger delay of the batch, can also create a batch of time + the value, the default 10

Hystrix. Collapser. Default. RequestCache. If enabled for HystrixCollapser. The execute () and HystrixCollapser. Queue () cache, true by default

4. Integration principle of Feign and Hystrix

4.1 Hystrix Generates dynamic proxies

Mainly through HystrixTaget instance methods of the target, first of all judge previously built Feign. Builder is Feign. Hystrix. HystrixFeign. Builder.

We then use the targetWithFallback method to get a separate FallbackFactory from the service-independent Spring container to get the instance of our defined Fallback object. It verifies that the fallback is ok and that the Fallback factory has returned a fall object that cannot be null

After through builder build method, according to the service information, parameters, such as building HystrixInvocationHandler instance, and set up a contract for HystrixDelegatingContract, This example is used to parse Hystrix annotations.

Then build ReflectiveFeign from its parent class Feign.Builder. In this case, the logic is the same as Feign used to build dynamic proxies.

4.2 Execution Process

A HystrixInvocationHadnler object is initialized when a dynamic proxy object is built. The constructor of this instance uses a toSetters method, which assemes the method with the corresponding key information.

static Map<Method, Setter> toSetters(SetterFactory setterFactory, Target
        target, Set
       
         methods)
        {
    Map<Method, Setter> result = new LinkedHashMap<Method, Setter>();
    for (Method method : methods) {
      method.setAccessible(true);
      result.put(method, setterFactory.create(target, method));
    }
    return result;
  }

public HystrixCommand.Setter create(Target
        target, Method method) {
      String groupKey = target.name();
      String commandKey = Feign.configKey(target.type(), method);
      return HystrixCommand.Setter
          .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
          .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
}
Copy the code

The main core key is that the create method, the target object is the information about the encapsulated service, and we can see from the source that a service name, which is the service name that we set in the @FeignClient annotation, As a groupKey, and the method name and method parameters in this interface as commandKey

A groupkey represents a thread pool, that is, the interface invocation requests contained in the service are completed by the thread pool. An interface in a service represents a commandKey, which belongs to the groupkey of the service

We then build an instance of HystrixCommand using the setterMethodMap, which is responsible for executing the logic, the general logic, through the run() Method, through the handler object bound to the Method object. There is also a method called getFallback, in which the logic is basically the same as above: get the corresponding Fallback object from the factory, get the corresponding handler from the handler factory, and execute the degraded method.

Execute () via HystrixCommand, queue(), toObservable().toblocking ().toFuture() To put it simply, it obtained the corresponding Future object from the thread pool corresponding to the service, and then wrapped the Future object, mainly because the original Future object could not cancel. Finally, it returned the wrapped object F, and judged whether the execution was completed by F. The command result is displayed

 public Future<R> queue(a) {
        // Get the Future of the service
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        // Wrap the native Future
        final Future<R> f = new Future<R>() {
            
            ...
        }
     if (f.isDone()) { // Check whether the execution is complete
            try {
                f.get();
                return f; / / return
            } catch (Exception e) {
                Throwable t = decomposeException(e);
                if (t instanceof HystrixBadRequestException) {
                    return f;
                } else if (t instanceof HystrixRuntimeException) {
                    HystrixRuntimeException hre = (HystrixRuntimeException) t;
                    switch (hre.getFailureType()) {
                    case COMMAND_EXCEPTION:
                    case TIMEOUT:
                        // we don't throw these types from queue() only from queue().get() as they are execution errors
                        return f;
                    default:
                        // these are errors we throw from queue() as they as rejection type errors
                        throwhre; }}else {
                    throwExceptions.sneakyThrow(t); }}}return f;
 }
Copy the code

In the toObservable method, an Observable is created, and inside it, Created terminateCommandCleanup, unsubscribeCommandCleanup, applyHystrixSemantics, wrapWithAllOnNextHooks, fireOnCompletedHook These objects, which are used for callbacks, are wrapped in Obserable objects, and the call method of those objects is invoked when the toBlocking() method is invoked.

After the toBlocking() method is invoked, the call() method of Func0 is actually called, which is the entry method to execute, and its internal implementation is relatively simple: The command state is changed from NOT_STARTED to OBSERVABLE_CHAIN_CREATE, and then the call logging is enabled. By default, the call logging is enabled, but not processed, and caching is disabled. In the end, a hystrixObservable is built, and the object created before is given to him. After creating an afterCache, the hystrixObservable is assigned to him, which is the returned Observable instance.

When it calls its callback, it calls the applyHystrixSemantics callback method, which is appliHystirxSemantics, and the short-circuiting semantics method says whether or not it can execute, if it can execute, The executeCommandAndObserve method is then called, where many more objects are built: markEmits, markOnCompleted, handleFallback, setRequestContext, and then determine whether timeout detection is enabled. Through executeCommandWithSpecifiedIsolation method of Func0. Call () method, modify some status value, Hystrix. StartCurrentThreadExecutingCommand (getCommandKey ()), in this way will commandKey YaRu stack, finally through getUserExecutionObservable method, Set the callback method, which executes the run() method, returns the final userObservable, and subscribes to the userObservable via the callback. The run() Method is called by the func0.call () callback in getExecutionObservable, and the invoke Method is invoked by the corresponding handler of the Method object for final execution.

Grass, the above is too messy, I will change it slowly, some of it is really hard to tidy up, then look at the picture. Follow up with more source code assistance

4.3 Thread Pool Related

In fact, the related thread pool constructor logic in HystrixInvocationHandler, whose invoke method builds HystrixCommand instances, calls the constructor of the parent class AbstractCommand

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
        // 
        this.commandGroup = initGroupKey(group);
        this.commandKey = initCommandKey(key, getClass());
        this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
        this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
        this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
        // This method is used for thread pool initialization
        this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults); . . }Copy the code

The initThreadPool method above calls a getInstance method

private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
        if (fromConstructor == null) {
            // get the default implementation of HystrixThreadPool
            return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
        } else {
            returnfromConstructor; }}Copy the code

HystrixThreadPoolDefault is used to initialize the thread pool from this class if the threadPoolKey does not have a corresponding thread pool. The threadPools attribute is a concurrentHashMap

static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
            // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
            String key = threadPoolKey.name();

            // this should find it for all but the first time
            HystrixThreadPool previouslyCached = threadPools.get(key);
            if(previouslyCached ! =null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize
            synchronized (HystrixThreadPool.class) {
                if(! threadPools.containsKey(key)) {// This is the core
                    threadPools.put(key, newHystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); }}return threadPools.get(key);
        }
Copy the code

Then go down, to see how to build HystrixThreadPoolDefault, or access to the relevant attribute information, first by concurrencyStrategy. GetThreadPool () method, the implementation of a thread pool to create

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
            this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
            HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            this.queueSize = properties.maxQueueSize().get();

            this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                    concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                    properties);
            this.threadPool = this.metrics.getThreadPool();
            this.queue = this.threadPool.getQueue();

            /* strategy: HystrixMetricsPublisherThreadPool */
            HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
        }
Copy the code

Now it’s pretty clear what some of the parameters mean, and I’m going to draw a table in a second, and I’m going to build queues and thread pools based on those parameters, and by default, CoreSize = 10, maxQueueSize = -1 this indirectly causes BlockingQueue to be of type SynchronousQueue, which means it is not queued by default

public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

        final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
        final int dynamicCoreSize = threadPoolProperties.coreSize().get();
        final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

        if (allowMaximumSizeToDivergeFromCoreSize) {
            final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
            if (dynamicCoreSize > dynamicMaximumSize) {
                logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                        dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
                        dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            } else {
                return newThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); }}else {
            return newThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); }}Copy the code
allowMaximumSizeToDivergeFromCoreSize True /false indicates whether automatic expansion is allowed
keepAliveTime When this parameter is enabled, new threads are automatically created, and new threads are reclaimed when they are idle for longer than this time
dynamicMaximumSize If the preceding parameter is enabled, the maximum number of threads can be expanded
maxQueueSize How many requests can a queued queue hold? The default value is -1
BlockingQueue Queue, because that parameter defaults to -1, this queue defaults to SynchronousQueue, which is not queued; LinkedBlockingQueue(maxQueueSize) if the value > 0
dynamicCoreSize The default is 10, and there are ten threads in the thread pool

Finally, to a small map, deepen the understanding, the map is not big, live to see, the final optimization in the late version

Actually executeCommandWithSpecifiedIsolation the call () method, carries on the execution of the observables, ordered a subscribeOn () method, the core of this code is no longer in the call, Instead, it’s in getScheduler()

subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call(a) {
                    returnproperties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; }}Copy the code

In this case, a new HystrixContextScheduler is created, which simply assigns some properties to it, and then a new instance of ThreadPoolScheduler is created

@Override
        public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
            touchConfig(); // Set some thread pool parameters
            return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
        }

public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
        this.concurrencyStrategy = concurrencyStrategy;
        this.threadPool = threadPool;
        this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
    }
Copy the code

HystrixContextScheduler invokes the createWorker inside (), created HystrixContextSchedulerWorker, through its scheduler (), this method can judge whether the queue of the thread pool is full

@Override
        public Subscription schedule(Action0 action) {
            if(threadPool ! =null) {
                if(! threadPool.isQueueSpaceAvailable()) {throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); }}return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
        }
Copy the code

Mainly through! ThreadPool. IsQueueSpaceAvailable () to judge, logic is relatively simple, maxQueueSize < 0, direct return is full, if not, still have to determine: The amount of the current queue < queueSizeRejectionThreshold attribute values

public boolean isQueueSpaceAvailable(a) {
            if (queueSize <= 0) {
                // we don't have a queue so we won't look for space but instead
                // let the thread-pool reject or not
                return true;
            } else {
                returnthreadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get(); }}Copy the code

Judgment after, full of error directly, not full began by ThreadPoolWorker schedule method, submitted the task, before calling the method is to create a HystrixContexSchedulerAction object, This is the ScheduledAction below.

@Override
        public Subscription schedule(final Action0 action) {
            if (subscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();
            }

            // This is internal RxJava API but it is too useful.
            ScheduledAction sa = newScheduledAction(action); subscription.add(sa); sa.addParent(subscription); ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(); FutureTask<? > f = (FutureTask<? >) executor.submit(sa); sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

            return sa;
        }
Copy the code

ToObservable ().toblocking ().toFuture() = toObservable().toblocking ().toFuture()

Then there is the execution, and continue to go, will find that the last or executeCommandWithSpecifiedIsolation Func0. In the () call () method, the call () method in the userObservable, Execute command’s run method, and execute this logic, which I wrote earlier, although it’s messy.

* * * *

4.4 the timeout

The main logic of this timeout, after creating observables, give the observables added a HystrixObservableTimeoutOperator instance, it’s the call () method is the key, the main, If a TimeListener is executed, the tick() method will handle the timeout logic. If the state is NOT_EXECUTED, change it to TIME_OUT and throw an exception

TimerListener listener = new TimerListener() {

                @Override
                public void tick(a) {
                    // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
                    // otherwise it means we lost a race and the run() execution completed or did not start
                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                        // report timeout failure
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                        // shut down the original request
                        s.unsubscribe();

                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                            @Override
                            public void run(a) {
                                child.onError(newHystrixTimeoutException()); }}); timeoutRunnable.run();//if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout}}@Override
                public int getIntervalTimeInMilliseconds(a) {
                    returnoriginalCommand.properties.executionTimeoutInMilliseconds().get(); }};Copy the code

The TimeListener is then added to the thread pool. If it times out, it has its own thread pool (coreSize = 4 by default) that has the Runnable thread class to call the tick() method above. And every time we set the timeout time to execute the task, and finally after the task is executed, if there is no timeout, the scheduled task will be cleared.

public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        // add the listener

        Runnable r = new Runnable() {

            @Override
            public void run(a) {
                try {
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e); }}}; ScheduledFuture<? > f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);return new TimerReference(listener, f);
    }
Copy the code

4.5 a fuse

A HystrixCircuitBreaker will initialize a HystrixCircuitBreaker when HystrixCommand is created. The circuit breaker will initialize a HystrixCircuitBreaker when HystrixCommand is created. The circuit breaker will initialize a HystrixCircuitBreaker when HystrixCommand is created. Ensure that each interface in each service has its own fuse

Go through HystrixCircuitBreakerImpl build fuse, Subscription s = subscribeToStream (); Is the core of judgment logic.

protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.properties = properties;
            this.metrics = metrics;

            //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
            Subscription s = subscribeToStream();
            activeSubscription.set(s);
        }
Copy the code

The main idea here is to use the subscriber, subscription metrics statistics, and the onNext() callback to determine whether to turn on the fuse.

First of all to ensure that within a time window (the default 10 s), the total number of requests is greater than the circuitBreakerRequestVolumeThreshold 20 (the default), then to judge, Request is greater than percentage of abnormal circuitBreakerErrorThresholdPercentage 50% (the default), if all meet, Set status.compareAndSet(status.closed, status.open) to OPEN

private Subscription subscribeToStream(a) {
            /* * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream */
            return metrics.getHealthCountsStream()
                    .observe()
                    .subscribe(newSubscriber<HealthCounts>() { ... .@Override
                        public void onNext(HealthCounts hc) {
                            // check if we are past the statisticalWindowVolumeThreshold
                            if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                                // we are not past the minimum volume threshold for the stat window,
                                // so no change to circuit status.
                                // if it was CLOSED, it stays CLOSED
                                // if it was half-open, we need to wait for a successful command execution
                                // if it was open, we need to wait for sleep window to elapse
                            } else {
                                if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                                    //we are not past the minimum error threshold for the stat window,
                                    // so no change to circuit status.
                                    // if it was CLOSED, it stays CLOSED
                                    // if it was half-open, we need to wait for a successful command execution
                                    // if it was open, we need to wait for sleep window to elapse
                                } else {
                                    // our failure rate is too high, we need to set the state to OPEN
                                    if(status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); }}}}});Copy the code

When the fuse is turned on, it checks whether or not to use the fuse in the applyHystrixSemantics callback of the Observable

 private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
        executionHook.onStart(_cmd);

        /* determine if we're allowed to execute */
        if (circuitBreaker.attemptExecution()) {
            / / todo
        } else {
            // Todo is demoted}}Copy the code

The default is -1, but the fuse is set to the current time when it is on. There is also an isAfterSleepWindow () method that checks if the fuse is on. 5 s is greater than the circuitBreakerSleepWindowInMilliseconds (the default), facilitate subsequent set to HAFL_OPEN, retry to try again, restore close the circuit breaker.

@Override
        public boolean attemptExecution(a) {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                if (isAfterSleepWindow()) {
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        //only the first request after sleep window should execute
                        return true;
                    } else {
                        return false; }}else {
                    return false; }}}private boolean isAfterSleepWindow(a) {
            final long circuitOpenTime = circuitOpened.get();
            final long currentTime = System.currentTimeMillis();
            final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
            return currentTime > circuitOpenTime + sleepWindowTime;
}
Copy the code

In terms of the above judgment, if the current time exceeds the opening time of the fuse + the interval we set, the fuse will be set to HASL_OPEN. At this time, the fuse will not be broken, and a normal process will be tried. If the normal process fails again, Can be carried through the call () callback handleFallback circuitBreaker, markNonSuccess (); , the logic is very simple, is to set the fuse to OPEN, update the current timestamp

@Override
        public void markNonSuccess(a) {
            if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
                //This thread wins the race to re-open the circuit - it resets the start time for the sleep windowcircuitOpened.set(System.currentTimeMillis()); }}Copy the code

If the implementation is successful, will pass the callback markEmits circuitBreaker, markSuccess (); Turn off the fuse

@Override
        public void markSuccess(a) {
            if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
                //This thread wins the race to close the circuit - it resets the stream to start it over from 0
                metrics.resetStream();
                Subscription previousSubscription = activeSubscription.get();
                if(previousSubscription ! =null) {
                    previousSubscription.unsubscribe();
                }
                Subscription newSubscription = subscribeToStream();
                activeSubscription.set(newSubscription);
                circuitOpened.set(-1L); }}Copy the code