Column series: SpringCloud column series

Series of articles:

SpringCloud source series (1) – registry initialization for Eureka

SpringCloud source code series (2) – Registry Eureka service registration, renewal

SpringCloud source code series (3) – Registry Eureka crawl registry

SpringCloud source code series (4) – Registry Eureka service offline, failure, self-protection mechanism

SpringCloud source series (5) – Registry EurekaServer cluster for Eureka

SpringCloud source Code Series (6) – Summary of the Registry Eureka

SpringCloud source series (7) – load balancing Ribbon RestTemplate

SpringCloud source series (8) – load balancing Ribbon core principles

SpringCloud source series (9) – load balancing Ribbon core components and configuration

SpringCloud source Series (10) – HTTP client component of load balancing Ribbon

SpringCloud Source Series (11) – Retries and summaries of the Load Balancing Ribbon

SpringCloud source Code Series (12) – Basic usage of Service invocation Feign

SpringCloud source Code Series (13) – Service invocation of Feign’s scanning @FeignClient annotation interface

SpringCloud source code series (14) – Service calls to Feign build @FeignClient interface dynamic proxy

SpringCloud source Series (15) – Service calls Feign with the Ribbon for load balancing requests

SpringCloud source code series (16) – Fuse Hystrix basic introduction

In this chapter, we will start with HystrixCommand construction and execute() execution, and take a step-by-step look at how Hystrix encapsulates the core principles of business logic, thread pool isolation, and fuse degradation.

It should be noted that The source code of Hystrix makes extensive use of RXJava responsive programming. The source code is filled with a large number of callbacks and Observable layer nesting. The source code operation process is not linear, so I will only show some core source code in the process of source analysis. So we can tease out the Hystrix design.

HystrixCommand components

HystrixCommand is an abstract class that inherits from AbstractCommand. Its core logic is all in AbstractCommand. HystrixCommand is relatively simple, mainly overloading several methods. So let’s first look at the structure of the HystrixCommand component.

HystrixCommand structure

Look at the HystrixCommand class structure diagram, notice the logo on the left, you can get the following information:

  • HystrixCommand provides multiple constructors, but they are protected and require subclasses to implement a constructor
  • run()Method is an abstract method that needs to be subclassed, meaning that our business logic is wrapped in the run() method
  • execute(),queue()Is a public method used to execute commands
  • getExecutionObservable(),getFallbackObservable()GetExecutionObservable () is the parent class that gets the Observable executing the command, GetFallbackObservable () is the parent class that gets the callback method Observable.
  • Other methods are as followsgetFallback(),getFallbackMethodName()They are protected and can be overridden by subclasses.

Gets the execution subscription object

AbstractCommand getExecutionObservable() is an abstract method that implements its parent class AbstractCommand. This method gets the run() method subscription object Observable, as you can see from its name or source code.

@Override
final protected Observable<R> getExecutionObservable(a) {
    // defer: The call() method is triggered only when the object is subscribed
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call(a) {
            try {
                // Subscribing to this Observable triggers the run() method
                return Observable.just(run());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    }).doOnSubscribe(new Action0() {
        @Override
        public void call(a) {
            // Save the current thread when subscribingexecutionThread.set(Thread.currentThread()); }}); }Copy the code

Likewise, getFallbackObservable() is the subscription object Observable that gets the callback method.

@Override
final protected Observable<R> getFallbackObservable(a) {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call(a) {
            try {
                // Returns a subscription object that performs the callback method
                return Observable.just(getFallback());
            } catch (Throwable ex) {
                returnObservable.error(ex); }}}); }Copy the code

Command Execution entry

HystrixCommand has four methods that can be called to execute commands: Execute (), queue(), observe(), toObservable(), and toObservable() are all implemented by calling toObservable(). Observe () and toObservable() both return an Observable. Calling.toblocking () triggers execution of the subscribed object, while toFuture() returns a Future that executes asynchronously. Calling the Future’s get() method can block synchronously and wait for execution results, thus ultimately implementing the different features of the four methods.

public R execute(a) {
    return queue().get();
}

public Future<R> queue(a) {
    // All calls end up in toObservable()
    final Future<R> delegate = toObservable().toBlocking().toFuture();
    // delegate does some handling after the delegate executes the exception
    final Future<R> f = new Future<R>() {
        / /...
        @Override
        public boolean isDone(a) {
            return delegate.isDone();
        }

        @Override
        public R get(a) throws InterruptedException, ExecutionException {
            return delegate.get();
        }
        / /...
    };

    /* special handling of error states that throw immediately */
    if (f.isDone()) {
        try {
            f.get();
            return f;
        } catch (Exception e) {
            //....}}return f;
}
Copy the code

HystrixCommand initialization

AbstractCommand constructor

HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey The default values are used if the remaining parameters are null.

The initialization process of HystrixCommand is relatively simple. It mainly initializes some command configurations and components.

  • CommandGroup: specifies the commandGroup. The parameter is mandatory.
  • CommandKey: Specifies the command name. The default is the class name. To configure the command, run the commandKey command.
  • Properties: Command property configuration. The default configuration can be viewedHystrixCommandPropertiesThis class.
  • ThreadPoolKey: specifies the name of the thread pool. The default value is the same as the group namehystrix-{groupKey}-{number}.
  • Metrics: Measures HystrixCommandMetrics, which measures the success, failure, and timeout of hystrix Command execution.
  • CircuitBreaker: HystrixCircuitBreaker, which determines whether the circuitBreaker is on when running a command.
  • ThreadPool: threadPool component HystrixThreadPool. When threadPool is isolated, tasks are thrown into the threadPool to execute, isolating resources.
  • EventNotifier: Time notification component HystrixEventNotifier
  • HystrixConcurrencyStrategy concurrencyStrategy: concurrent strategy
  • ExecutionHook: HystrixCommandExecutionHook
  • RequestCache: HystrixRequestCache
  • CurrentRequestLog: HystrixRequestLog. The default value is null.
// All parameters are non-mandatory except group parameters
protected AbstractCommandHystrixCommandKey group, // Group name HystrixCommandKey key, // command name HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, . / / Hystrix thread pool HystrixCommandProperties Setter commandPropertiesDefaults, / / configuration HystrixThreadPoolProperties Setter threadPoolPropertiesDefaults, / / the thread pool configuration HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, / / semaphore current limiter HystrixPropertiesStrategy propertiesStrategy, / / Hystrix allocation strategy component HystrixCommandExecutionHook executionHook) { // Hook trace Hystrix command execution
    // Command grouping
    this.commandGroup = initGroupKey(group);
    GetClass ().getSimplename ()
    this.commandKey = initCommandKey(key, getClass());
    / / the Command configuration, the default value is commandPropertiesDefaults
    this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
    // poolKeyOverride is used first, otherwise groupKey is used if threadPoolKey is empty
    this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
    // Measure statistics component: HystrixCommandMetrics
    this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
    // Circuit breaker: HystrixCircuitBreaker
    this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
    Hystrix thread pool: HystrixThreadPool
    this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

    //Strategies from plugins
    // Time notification: HystrixEventNotifier
    this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
    / / concurrent strategy: HystrixConcurrencyStrategy
    this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
    / / Hook: HystrixCommandExecutionHook
    this.executionHook = initExecutionHook(executionHook);
    HystrixRequestCache
    this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
    HystrixRequestLog, null by default
    this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

    // Semaphore callback override
    this.fallbackSemaphoreOverride = fallbackSemaphore;
    // Semaphore override
    this.executionSemaphoreOverride = executionSemaphore;
}
Copy the code

Component initialization

Take the HystrixCircuitBreaker initialization as an example to see how these components are initialized.

The initialization steps are basically similar, with the default component initialized if null is passed in as an AbstractCommand constructor argument. Each component has an inner class Factory, which provides a getInstance method to get the component. Factory uses a ConcurrentHashMap to cache components corresponding to different commands to avoid repeated creation. GetInstance () obtains components from the local cache first, and creates default components if none exist and places them in the local cache.

private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                                                        HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                                                        HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    if (enabled) {
        if (fromConstructor == null) {
            // Get the default circuit breaker from Factory
            return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
        } else {
            returnfromConstructor; }}else {
        // Disable circuit breakers and return implementation classes that do nothing
        return newNoOpCircuitBreaker(); }}Copy the code

HystrixCircuitBreaker. Factory:

public static class Factory {
    private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

    public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        // Check the local cache to see if a circuit breaker component with the same name has been created
        HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
        if(previouslyCached ! =null) {
            return previouslyCached;
        }
        
        // Create a default that has not yet been created
        HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
        if (cbForCommand == null) {
            return circuitBreakersByCommand.get(key.name());
        } else {
            returncbForCommand; }}}Copy the code

Gets the execution subscription object

ToObservable () gets the subscription object

AbstractCommand’s toObservable() method returns the final subscription object for command execution, but its internal encapsulation of the run() method is very complex, so let’s focus on the big and the small. Let’s take a look at the overall flow of toObservable returning the Observable from the subscription, until we find out where the run() method is executed.

The general process is as follows:

  • Action0 => terminateCommandCleanup

  • Define command that cancels the execution of the after back to mobilize a Action0 = > unsubscribeCommandCleanup.

  • Func0 => applyHystrixSemantics is the core semantics that defines the application of hystrix. The implementation of this callback is shown because it encapsulates the run() method.

  • Define a transformation Hook Func1 => wrapWithAllOnNextHooks.

  • Define the callback after Hook completion => fireOnCompletedHook.

  • The last step is to create an Observable subscription. Let’s see what it does:

    • Check the status of the command first. Non-not_started raises an exception. Otherwise, change the status of the command to OBSERVABLE_CHAIN_CREATED
    • Set the time when the command is executed
    • Determines whether request logging is enabled. The currentRequestLog is null by default
    • To enable request caching, you need to reset the cached Key returned by the getCacheKey() method
    • If request caching is enabled, it is first fetched from the HystrixRequestCache cache component, and if it already exists, the subscription to the cache object is returned directly
    • If the cache is not enabled or not in the cache, the subscription object is createdapplyHystrixSemanticsThe subscription object returned
    • If caching is enabled, encapsulate the subscription object and cache the data after the request ends. Otherwise, use the Observable you created earlier
    • The last step is to return the subscription object created earlier and set the callback action defined

ToObservable () To summarize, there are actually two core places:

  • One is theapplyHystrixSemanticsIs where the core business logic is encapsulated;
  • The other is if the request cache is enabled, the request is fetched from the cache first, the command is not executed, and the result is cached after the request is completed.
public Observable<R> toObservable(a) {
    // _cmd => Current command object
    final AbstractCommand<R> _cmd = this;

    // Some actions after the command is executed
    final Action0 terminateCommandCleanup = newAction0() {... };// Command to cancel some actions
    final Action0 unsubscribeCommandCleanup = newAction0() {... };// Apply Hystrix's core semantics section, Hystrix execution entry
    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call(a) {
            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                return Observable.never();
            }
            // Execute the hystrix request
            returnapplyHystrixSemantics(_cmd); }};// Do some conversions to the original command
    final Func1<R, R> wrapWithAllOnNextHooks = newFunc1<R, R>() {... };// Execute the successful action
    final Action0 fireOnCompletedHook = newAction0() {... };// defer: Does not execute immediately, calling toBlocking() before executing the call() method
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call(a) {
            // Set the state to OBSERVABLE_CHAIN_CREATED. If the initial state is not NOT_STARTED, an exception will be thrown
            if(! commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {throw newHystrixRuntimeException(...) ; }// Set the command start time (note that this is set only when you actually subscribe)
            commandStartTimestamp = System.currentTimeMillis();

            CurrentRequestLog is null by default
            if(properties.requestLogEnabled().get() && currentRequestLog ! =null) {
                currentRequestLog.addExecutedCommand(_cmd);
            }

            // Whether request caching is enabled and cacheKey is not empty
            final boolean requestCacheEnabled = isRequestCachingEnabled();
            To enable request caching, override the getCacheKey() method
            final String cacheKey = getCacheKey();

            // First fetch from the cache
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if(fromCache ! =null) {
                    isResponseFromCache = true;
                    // Return the cached data directly
                    returnhandleRequestCacheHitAndEmitValues(fromCache, _cmd); }}// Subscribe applyHystrixSemantics returns the subscribe object
            Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);

            Observable<R> afterCache;

            // With request caching enabled, subscribe to hystrixObservable again and cache the results after execution
            if(requestCacheEnabled && cacheKey ! =null) {
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                // The subscription object is placed in the cache
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                // Unsubscribe if the cache already exists and return the contents of the cache
                if(fromCache ! =null) {
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else{ afterCache = toCache.toObservable(); }}// No caching is the original hystrixObservable
            else {
                afterCache = hystrixObservable;
            }

            // Returns the subscription object
            return afterCache
                    .doOnTerminate(terminateCommandCleanup) // The command is executed
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // Execute after the command is cancelled
                    .doOnCompleted(fireOnCompletedHook); // Execute the command}}); }Copy the code

Apply Hystrix circuit breakers or semaphores

ApplyHystrixSemantics (_cmd) is the semantics that encapsulates Hystrix, but we haven’t seen run() yet, so let’s look at what it does.

  • First issue a Hook to tell the command to execute.

  • Then use the breaker to determine whether to allow the request. If the breaker rejects it, for example, the breaker is in the open state, it directly degrades.

  • If the breaker allows the request, get a TryableSemaphore, if the semaphore mode is returned is TryableSemaphoreActual; Thread pool mode returns TryableSemaphoreNoOp, which does nothing but let go.

  • The action Action0 => singleSemaphoreRelease that defines whether or not a semaphore license occurs after the request ends.

  • Action1 => markExceptionThrown defines the action Action1 => markExceptionThrown to notify the exception.

  • After obtaining the semaphore license, failure to obtain the semaphore will enter the semaphore refuse degradation

  • Once the semaphore license is obtained, the time to start execution is set

  • Finally, subscribe again through the executeCommandAndObserve(_cmd) method and set the error callback, end callback, and cancel callback

To summarize, applyHystrixSemantics(_cmd) is all about applying circuit breakers or semaphores to limit the flow of a request.

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    // The command is executed
    executionHook.onStart(_cmd);

    // Whether the breaker allows requests
    if (circuitBreaker.allowRequest()) {
        // Get Semaphore, Semaphore mode returns TryableSemaphoreActual, thread pool mode returns TryableSemaphoreNoOp
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();

        // Release the semaphore license when the request ends
        final Action0 singleSemaphoreRelease = newAction0() {... };// Notify when an exception is thrown
        final Action1<Throwable> markExceptionThrown = newAction1<Throwable>() {... };Thread pool mode always returns true for the semaphore license
        if (executionSemaphore.tryAcquire()) {
            try {
                // Set the start time of execution. This should be the time when the command is actually executed
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                
                // Execute the command and subscribe
        return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown) // Execute after error
                        .doOnTerminate(singleSemaphoreRelease) // Execute the command
                        .doOnUnsubscribe(singleSemaphoreRelease); // Execute after canceling execution
            } catch (RuntimeException e) {
                returnObservable.error(e); }}else {
            // Semaphore rejected => Degraded
            returnhandleSemaphoreRejectionViaFallback(); }}else {
        // Short circuit => Degraded
        returnhandleShortCircuitViaFallback(); }}Copy the code

Subscription object timeout processing

ApplyHystrixSemantics (_cmd) calls executeCommandAndObserve(_cmd) to get the subscription object.

  • First, I also created a few callback objects

  • The core is in the last few steps, invoke executeCommandWithSpecifiedIsolation (_cmd) to get a subscription object, the method name should be used Hystrix isolation strategy.

  • If you enable timeouts, subscribe to the object will also increase a timeout processor HystrixObservableTimeoutOperator, in can be found that the processor creates a TimerListener to change the status to isCommandTimedOut timeout, So this corresponds to this. We’ll talk about timeouts later.

To sum up, the core of executeCommandAndObserve(_cmd) is to add a timeout handler to the subscription object in response to a timeout if it is enabled.

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        // Mark that the command has been executed
        final Action1<R> markEmits = newAction1<R>() {... };// Indicates that the command execution is complete
        final Action0 markOnCompleted = newAction0() {... };// Handle the callback
        final Func1<Throwable, Observable<R>> handleFallback = newFunc1<Throwable, Observable<R>>() {... };// Set the current thread
        final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? superR>>() {... }; Observable<R> execution;Observable that creates the run() method continues encapsulation according to the configured isolation policy. Thread pool isolation mode is put into the thread pool for scheduling, and semaphore mode returns directly
        if (properties.executionTimeoutEnabled().get()) {
            / / overtime will be handled by HystrixObservableTimeoutOperator, throw HystrixTimeoutException timeout exception
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        // Set the subscription callback
        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }
Copy the code

Execute the command under the configured isolation policy

Then look at executeCommandWithSpecifiedIsolation (_cmd) method, the method name will know it’s the command to configure isolation strategy execution. You can see that the first if… else… Split into thread pool isolation and semaphore isolation.

Thread pool isolation mode:

  • Change the status of the command from OBSERVABLE_CHAIN_CREATED to USER_CODE_EXECUTED to indicate that the user code is executed.

  • Run the isCommandTimedOut command to check whether the command times out. If the command times out, an exception is thrown and the command times out before the command is executed. This can happen if the thread pool is full, the command is waiting in a queue, and times out while waiting. Remember the isCommandTimedOut thing, which is set somewhere else.

  • Hystrix. StartCurrentThreadExecutingCommand (getCommandKey ()) seems to be began to execute the command, but its interior is just put HystrixCommandKey in a stack of stack, The returned endCurrentThreadExecutingCommand is at the end of the command execution Will HystrixCommandKey pop up from the stack. It’s not clear what it does.

  • Through getUserExecutionObservable (_cmd) for the user to perform subscription object, this method is the final packaging the run () method returns the subscription.

  • The biggest difference between thread pool isolation and semaphore isolation is that in the last step, thread pool isolation has a subscribeOn Scheduler, This scheduler is an Rx.Scheduler object obtained by calling ThreadPool.getScheduler (Func0 func). The actual type is HystrixContextScheduler. The Observable returned by call() is thrown into the Scheduler for asynchronous scheduling, so this will be the entry point for thread pool isolation.

Signal isolation is relatively easier, final step by same getUserExecutionObservable (_cmd) to obtain the run () method of subscription object.

To summarize, the core of this method is to return a subscription object to run() and isolate resources according to the command’s isolation policy. Thread pool isolation subscribing to a Scheculer for scheduled execution, and you can assume that the interior will eventually be thrown into a thread pool for execution.

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    Thread pool isolation
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call(a) {
                // The status changed from OBSERVABLE_CHAIN_CREATED to USER_CODE_EXECUTED, indicating that the code for the run() method is executed
                if(! commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(newIllegalStateException(...) ); }// The command is executed
                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
                // To check whether the command timed out, isCommandTimedOut is updated in TimerListener
                if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                    return Observable.error(new RuntimeException("timed out before executing run()"));
                }

                // Increase the number of execution threads
                HystrixCounters.incrementGlobalConcurrentThreads();
                // The statistics thread starts executing
                threadPool.markThreadExecution();
                // Push the command Key to the top of a stack
                endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());

                try {
                    // issue some Hook notifications...
                    // Get the Observable of Run ()
                    return getUserExecutionObservable(_cmd);
                } catch (Throwable ex) {
                    return Observable.error(ex);
                }
            }
        }).doOnTerminate(new Action0() {
            // Update the thread status after the command is executed...
        }).doOnUnsubscribe(new Action0() {
            // Update thread status after the command is canceled...
        })
        // Place the Observable returned by defer into a scheduler to execute asynchronously. The scheduler ==> HystrixContextScheduler
        .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call(a) {
                // Determine whether to interrupt thread execution
                returnproperties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; }})); }// Semaphore isolation
    else {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call(a) {
                // The status changed from OBSERVABLE_CHAIN_CREATED to USER_CODE_EXECUTED, indicating that the code for the run() method is executed
                if(! commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(newIllegalStateException(...) ); }// Statistics: the semaphore default command starts to execute
                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                // Push the command Key to the top of a stack
                endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                try {
                    // issue some Hook notifications...
                    // Get the Observable of Run ()
                    return getUserExecutionObservable(_cmd);
                } catch (Throwable ex) {
                    returnObservable.error(ex); }}}); }}Copy the code

Gets the subscription object for business code execution

In the end, finally find enclosed the run () method, getUserExecutionObservable (_cmd) method is simpler, Call HystrixCommand’s getExecutionObservable() to get the subscription object that executes Run (), which is our custom business code.

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    Observable<R> userObservable;
    try {
        // Encapsulates the overloaded run() method
        userObservable = getExecutionObservable();
    } catch (Throwable ex) {
        userObservable = Observable.error(ex);
    }

    return userObservable
            // Hystrix performs callback processing
            .lift(new ExecutionHookApplication(_cmd))
            .lift(new DeprecatedOnRunHookApplication(_cmd));
}
Copy the code

GetExecutionObservable () in HystrixCommand:

final protected Observable<R> getExecutionObservable(a) {
    // defer: The call() method is triggered only when the object is subscribed
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call(a) {
            try {
                // Subscribing to this Observable triggers the run() method
                return Observable.just(run());
            } catch (Throwable ex) {
                returnObservable.error(ex); }}})}Copy the code

A diagram summarizes the process of getting a subscription object

ToObservable () is the entry point for retrieving the run() business logic subscription object. To add hystrix features, layers are embedded to create the final Observable. Basically, each submethod adds a feature to the subscription object.

  • toObservable(): Entry to get a subscription object.
  • applyHystrixSemantics(_cmd): Apply a Hystrix circuit breaker or semaphore to reject downgrade when the circuit breaker is on or unable to acquire the semaphore.
  • executeCommandAndObserve(_cmd)If timeout is enabled, add a timeout handler to the subscribed object,
  • executeCommandWithSpecifiedIsolation: Returns different subscription objects according to the configured isolation policy. Thread pool isolation throws the subscription object into oneHystrixContextSchedulerTo schedule execution.
  • getUserExecutionObservable(_cmd): returns the subscription object that truly encapsulates the run() business logic.