Abstract: the original source http://www.iocoder.cn/Hystrix/command-collapser-execute/ “taro source” welcome to reprint, retain the, thank you!

This article is based on Hystrix version 1.5.X

  • 1. An overview of the
  • 2. HystrixCollapser
    • 2.1 Construction method
    • 2.2 Running commands
    • 2.3 Core Methods
  • 3. RequestCollapserFactory
  • 4. RequestCollapser
    • 4.1 Construction method
    • 4.2 RequestBatch
    • 4.3 # submitRequest (arg)
    • 4.4 # createNewBatchAndExecutePreviousIfNeeded (previousBatch)
  • 5. CollapserTimer
    • 5.1 RealCollapserTimer
    • 5.2 CollapsedTask
  • 666. The eggs

🙂🙂🙂 follow ** wechat official number: ** Have welfare:

  1. RocketMQ/MyCAT/Sharding-JDBC all source code analysis article list
  2. RocketMQ/MyCAT/Sharding-JDBC 中文 解 决 source GitHub address
  3. Any questions you may have about the source code will be answered carefully. Even do not know how to read the source can also ask oh.
  4. New source code parsing articles are notified in real time. It’s updated about once a week.
  5. Serious source communication wechat group.

1. An overview of the

This article focuses on the Hystrix command merge execution.

In the Hystrix Document – Implementation Principles “Request Merge”, the concepts, principles, usage scenarios, advantages and disadvantages of Hystrix command merge are thoroughly shared, so you can read it carefully first.

The overall process of command combination execution is as follows:

Hystrix Document – Implementation Principles

  • Step 1: Submit a single command request to the RequestQueue
  • In the second part, a timed task obtains multiple commands from the request queue periodically and merges them.

In the official offer sample, we through the use of CommandCollapserGetValueForKey familiar with the command executed concurrently.


Recommended Spring Cloud books:

  • Please support the legal version. Download piracy, is equal to the initiative to write low-level bugs.
  • DD — Spring Cloud Micro Services
  • Zhou Li — “Spring Cloud and Docker Micro-service Architecture Combat”
  • Buy two books together, jingdong free delivery.

2. HystrixCollapser

Com.net flix. Hystrix. HystrixCollapser, command combiner abstract parent class.

NOTE: com.net flix. Hystrix. HystrixObservableCollapser, another command combiner abstract parent class, this article will not enter the parsing.

2.1 Construction method

HystrixCollapser constructor ()

public abstract class HystrixCollapser<BatchReturnType.ResponseType.RequestArgumentType>
        implements HystrixExecutable<ResponseType>, HystrixObservable<ResponseType> {

    private final RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> collapserFactory;
    private final HystrixRequestCache requestCache;
    private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> collapserInstanceWrapper;
    private final HystrixCollapserMetrics metrics;
    
    /* package for tests */ HystrixCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) {
        if (collapserKey == null || collapserKey.name().trim().equals("")) {
            String defaultKeyName = getDefaultNameFromClass(getClass());
            collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName);
        }

        HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder);
        this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, properties);
        this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());

        if (metrics == null) {
            this.metrics = HystrixCollapserMetrics.getInstance(collapserKey, properties);
        } else {
            this.metrics = metrics;
        }

        final HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this;

         /* strategy: HystrixMetricsPublisherCollapser */
        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties);

        /** * Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class. */
        collapserInstanceWrapper = new HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType>() {

            @Override
            public Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
                Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = self.shardRequests(requests);
                self.metrics.markShards(shards.size());
                return shards;
            }

            @Override
            public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
                final HystrixCommand<BatchReturnType> command = self.createCommand(requests);

                command.markAsCollapsedCommand(this.getCollapserKey(), requests.size());
                self.metrics.markBatch(requests.size());

                return command.toObservable();
            }

            @Override
            public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
                return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
                    @Override
                    public void call(BatchReturnType batchReturnType) {
                        // this is a blocking call in HystrixCollapser
                        self.mapResponseToRequests(batchReturnType, requests);
                    }
                }).ignoreElements().cast(Void.class);
            }

            @Override
            public HystrixCollapserKey getCollapserKey(a) {
                returnself.getCollapserKey(); }}; }}Copy the code
  • BatchReturnType Generic type. The result type returned when multiple commands are executed together.
  • ResponseType Generic, the result type returned by a single command execution.
  • RequestArgumentType A generic, single command argument type.
  • collapserFactoryProperties, RequestCollapserThe factoryIn the”3. RequestCollapserFactory”Detailed analysis.
  • requestCacheAttribute, TODO [2012] [Request context]
  • collapserInstanceWrapperProperties,The commandThe combinator wrapper.
    • com.netflix.hystrix.collapser.HystrixCollapserBridge interface, click on thelinkLook at the code.
    • HystrixCollapserBridge for RequestBatch transparent call HystrixCollapser or HystrixObservableCollapser method different implementations. See Bridge Mode.
  • metricsProperties, TODO [2002] [metrics]

2.2 Running commands

In Hystrix Source Code Parsing — How to Execute Commands, we looked at the four ways HystrixCommand provides to execute commands.

HystrixCollapser is similar to HystrixCommand and collapser also provides four similar command execution methods. In the following three methods, the code is basically similar, so we will not repeat the following:

  • #observe()Methods:portal 。
  • #queue()Methods:portal 。
  • #execute()Methods:portal 。

The #toObservable() method is implemented as follows:

  1: public Observable<ResponseType> toObservable(a) {
  2:     // when we callback with the data we want to do the work
  3:     // on a separate thread than the one giving us the callback
  4:     return toObservable(Schedulers.computation());
  5:}6: 
  7: public Observable<ResponseType> toObservable(Scheduler observeOn) {
  8:     return Observable.defer(new Func0<Observable<ResponseType>>() {
  9:         @Override
 10:         public Observable<ResponseType> call(a) {
 11:             // // Cache switch and cache KEY
 12:             final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get();
 13:             final String cacheKey = getCacheKey();
 14: 
 15:             // Get it from the cache first
 16:             /* try from cache first */
 17:             if (isRequestCacheEnabled) {
 18:                 HystrixCachedObservable<ResponseType> fromCache = requestCache.get(cacheKey);
 19:                 if(fromCache ! =null) {
 20:                     metrics.markResponseFromCache();
 21:                     return fromCache.toObservable();
 22:}23:}24: 
 25:             / / get RequestCollapser
 26:             RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
 27: 
 28:             // Submit the command request
 29:             Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
 30: 
 31:             // Get the cache Observable
 32:             if(isRequestCacheEnabled && cacheKey ! =null) {
 33:                 HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response);
 34:                 HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(cacheKey, toCache);
 35:                 if (fromCache == null) {
 36:                     return toCache.toObservable();
 37:}else {
 38:                     toCache.unsubscribe(); // Unsubscribe
 39:                     return fromCache.toObservable();
 40:}41:}42: 
 43:             // Get an cached Observable
 44:             return response;
 45:}46:});47:}Copy the code
  • observeOnMethod parameter, the actual method is not used, skip ignore.
  • Lines 11 to 13: Cache switch, KEY.
  • reverseLines 32 through 41: Get the cache Observable. This piece of code andAbstractCommand#toObservavle(...)Similarly, in”4. AbstractCommand#toObservavle(…)” Hystrix source parsing — Execution Result CacheHave detailed analysis.
  • reverseLine 44: Get an cacheless Observable.
  • Note that the returned Observable probably hasn’t actually executed or hasn’t finished executing#queue() / #execute()Method via BlockingObservableblockingWait until the execution is complete. BlockingObservable inRxJava source Code parsing – BlockingObservableHave detailed analysis.
  • Line 26: CallRequestCollapserFactory#getRequestCollapser()Get RequestCollapser. in”3. RequestCollapserFactory”Detailed analysis.
  • Line 29: Submit a single command request to the RequestQueue, which is the first step in the command merge process. In 4. RequestCollapser

2.3 Core Methods

  • #getRequestArgument(…) Abstract method to obtain a single command parameter. The code is as follows:

    public abstract RequestArgumentType getRequestArgument(a);
    Copy the code

  • #createCommand(…) Abstract method that merges multiple command requests to create a HystrixCommand. The code is as follows:

    protected abstract HystrixCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
    Copy the code

  • #mapResponseToRequests(…) Abstract method that maps the execution results of a HystrixCommand back to the corresponding command requests.

    protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
    Copy the code

  • #shardRequests(…) Method to fragment multiple command requests into N. By default, sharding is not performed. The code is as follows:

    protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
        return Collections.singletonList(requests);
    }
    Copy the code

  • #shardRequests(…) In the case of, the overall method flow is as follows:

  • In rewriting # shardRequests (…). In the case of, the overall method flow is as follows:

    • The command request sharding in this figure is just an example, and the actual situation varies depending on the logic overridden.

3. RequestCollapserFactory

Com.net flix. Hystrix. Collapser RequestCollapserFactory, RequestCollapser factory.

public class RequestCollapserFactory<BatchReturnType.ResponseType.RequestArgumentType> {
    
    private final CollapserTimer timer;
    private final HystrixCollapserKey collapserKey;
    private final HystrixCollapserProperties properties;
    private final HystrixConcurrencyStrategy concurrencyStrategy;
    private final Scope scope;
    
    public RequestCollapserFactory(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties properties) {
         /* strategy: ConcurrencyStrategy */
        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        this.timer = timer;
        this.scope = scope;
        this.collapserKey = collapserKey;
        this.properties = properties;
    }
Copy the code
  • timerProperty, command aggregator timer, in”5. CollapserTimer”Detailed analysis.
  • collapserKeyProperty, the command aggregator identifier, implemented like HystrixThreadPoolKey.
    • HystrixCollapserKey, click the link to see the code.
    • HystrixThreadPoolKey is explained in detail in Hystrix Source Code Parsing — Command Execution (2) execution Isolation Policy 3. HystrixThreadPoolKey.
  • propertiesProperty, command aggregator property configuration.
  • concurrencyStrategyProperty, concurrency policy, inThe Hystrix source code parsing – command execution (2) perform isolation strategy of “4. HystrixConcurrencyStrategy”Have detailed analysis.
  • scopeProperty, command request scope. There are currently two scopes:
    • REQUEST: HystrixRequestContext.

      Typically this means that requests within a single user-request (ie. HTTP request) are collapsed.

      No interaction with other user requests.

      1 queue per user request.

    • GLOBAL: indicates the GLOBAL situation.

      Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed.

      1 queue for entire app.


Call the #getRequestCollapser() method to get the RequestCollapser. The code is as follows:

public RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> getRequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser) {
   if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) {
       return getCollapserForUserRequest(commandCollapser);
   } else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) {
       return getCollapserForGlobalScope(commandCollapser);
   } else {
       logger.warn("Invalid Scope: {} Defaulting to REQUEST scope.", getScope());
       returngetCollapserForUserRequest(commandCollapser); }}Copy the code
  • According to thescopeInstead, call two different methods to get the RequestCollapser. The two methods have the same logic and are preferredThe cacheIn the RequestCollapser that meets the criteria; If not, create a RequestCollapser that meets the criteria and add it toThe cacheAnd return.
    • REQUEST: call#getCollapserForUserRequest()Method, TODO [2012] [request context].
    • GLOBAL: call#getCollapserForGlobalScope()Method, clicklinkTo viewChinese annotationThe code.

4. RequestCollapser

Com.net flix. Hystrix. Collapser RequestCollapser, combiner command request. Mainly used for:

  • Submit a single command request to the RequestQueue.
  • Receives multiple commands submitted from scheduled tasks and executes them together.

4.1 Construction method

The RequestCollapser constructor looks like this:

public class RequestCollapser<BatchReturnType.ResponseType.RequestArgumentType> {

    private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;
    // batch can be null once shutdown
    private final AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>> batch = new AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>>();
    private final AtomicReference<Reference<TimerListener>> timerListenerReference = new AtomicReference<Reference<TimerListener>>();
    private final AtomicBoolean timerListenerRegistered = new AtomicBoolean();
    private final CollapserTimer timer;
    private final HystrixCollapserProperties properties;
    private final HystrixConcurrencyStrategy concurrencyStrategy;
    
    RequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, HystrixCollapserProperties properties, CollapserTimer timer, HystrixConcurrencyStrategy concurrencyStrategy) {
        this.commandCollapser = commandCollapser; // the command with implementation of abstract methods we need 
        this.concurrencyStrategy = concurrencyStrategy;
        this.properties = properties;
        this.timer = timer;
        batch.set(newRequestBatch<BatchReturnType, ResponseType, RequestArgumentType>(properties, commandCollapser, properties.maxRequestsInBatch().get())); }}Copy the code
  • commandCollapserProperties,The commandThe combinator wrapper.
  • batchAttribute, RequestBatch,This is the request queue that I’ve been talking about throughout this article. in”4.2 RequestBatch”It will also be dissected in detail.
  • timerListenerReferenceProperties,registeredListener of timer in command aggregator. Each RequestCollapserA uniqueThe listener. This listener (which is actually used to create scheduled tasks)Fixed cycleFrom the request queuemultipleCommand execution, submit RequestCollapser merge execution. in”5. CollapserTimer”It will also be dissected in detail.
  • timerListenerRegisteredProperties,timerListenerReferenceWhether you have registered.
  • timerProperty for the command aggregator timer.
  • propertiesProperty, command aggregator property configuration.
  • concurrencyStrategyProperty, concurrency policy.

4.2 RequestBatch

Com.net flix. Hystrix. Collapser RequestBatch, ordered the request queue. Provides the following functions:

  • Command requests are added
  • Command request removal
  • Command requestedBatch execution. RequestBatch is a command request queue.
    • Why not implement RequestBatch as a pure queue in RequestCollapser? “In” 4.4 # createNewBatchAndExecutePreviousIfNeeded (previousBatch) parsing.

The RequestBatch constructor looks like this:

public class RequestBatch<BatchReturnType.ResponseType.RequestArgumentType> {

    private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;
    private final int maxBatchSize;
    private final AtomicBoolean batchStarted = new AtomicBoolean();

    private final ConcurrentMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>> argumentMap =
            new ConcurrentHashMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>>();
    private final HystrixCollapserProperties properties;

    private ReentrantReadWriteLock batchLock = new ReentrantReadWriteLock();

    public RequestBatch(HystrixCollapserProperties properties, HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, int maxBatchSize) {
        this.properties = properties;
        this.commandCollapser = commandCollapser;
        this.maxBatchSize = maxBatchSize; }}Copy the code
  • commandCollapserProperties,The commandThe combinator wrapper.
  • maxBatchSizeProperty, the maximum length of the queue.
  • batchStartedProperty, whether execution starts.
  • argumentMapProperty, command request parameter mapping (The queue).
  • propertiesProperty, command aggregator property configuration.
  • batchLockProperties,argumentMapOperation of theRead-write lock.

RequestBatch implements queue specific operations. In the “4.3 # submitRequest (arg)”/” 4.4 # createNewBatchAndExecutePreviousIfNeeded (previousBatch) “together.

4.3 # submitRequest (arg)

In the #toObservable() method, the #submitRequest(ARG) method is called to submit a single command request to RequestBatch. The code is as follows:

  1: public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
  2:     / * 3: * We only want the timer ticking if there are actually things to do so we register it the first time something is added. 4: * /
  5:     if(! timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false.true)) {
  6:         /* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */
  7:         timerListenerReference.set(timer.addListener(new CollapsedTask()));
  8:}9: 
 10:     // loop until succeed (compare-and-set spin-loop)
 11:     while (true) {
 12:         / / get RequestBatch
 13:         final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get();
 14:         if (b == null) {
 15:             return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown"));
 16:}17: 
 18:         // Add to RequestBatch
 19:         final Observable<ResponseType> response;
 20:         if(arg ! =null) {
 21:             response = b.offer(arg);
 22:}else {
 23:             response = b.offer( (RequestArgumentType) NULL_SENTINEL);
 24:}25: 
 26:         // Add successfully, returns Observable
 27:         // it will always get an Observable unless we hit the max batch size
 28:         if(response ! =null) {
 29:             return response;
 30:}else {
 31:             // Add failed, execute RequestBatch and create a new RequestBatch
 32:             // this batch can't accept requests so create a new one and set it if another thread doesn't beat us
 33:             createNewBatchAndExecutePreviousIfNeeded(b);
 34:}35:}36:}Copy the code
  • Lines 5 through 8: Initialize the RequestCollapser listener when the Collapse task has not yet been created.
  • Lines 11 to 35:Infinite loopUntil submittedA singleThe command is requested to RequestBatchsuccessful.
    • Lines 13 to 16: Get the RequestBatch. From current code, unless RequestCollapser is#shutdown()After will appear asnullIn the case.
    • Lines 19 to 24: RedeploymentRequestBatch#offer(...)Method, submitA singleThe command is requested to the RequestBatch and gets an Observable. Here toarg == nullI did a special treatment becauseRequestBatch.argumentMapIs ConcurrentHashMap and is not allowed to benull. In addition,RequestBatch#offer(...)Method implementation code, at the end of the current method, detailed parsing.
    • Lines 28 through 29: Added successfully, returns Observable.
    • Lines 30 to 34: Add failed, execute multiple command merge of the current RequestBatch and create a new RequestBatch. “In” 4.4 # createNewBatchAndExecutePreviousIfNeeded (previousBatch) parsing.

RequestBatch#offer(…) Method with the following code:

  1: public Observable<ResponseType>  offer(RequestArgumentType arg) {
  2:     // Execution started, add failed
  3:     /* short-cut - if the batch is started we reject the offer */
  4:     if (batchStarted.get()) {
  5:         return null;
  6:}7: 
  8:     /* 9: * The 'read' just means non-exclusive even though we are writing. 10: */
 11:     if (batchLock.readLock().tryLock()) {
 12:         try {
 13:             // Execution started, add failed
 14:             /* double-check now that we have the lock - if the batch is started we reject the offer */
 15:             if (batchStarted.get()) {
 16:                 return null;
 17:}18: 
 19:             // Failed to add the queue when the maximum queue length was exceeded
 20:             if (argumentMap.size() >= maxBatchSize) {
 21:                 return null;
 22:}else {
 23:                 // Create CollapsedRequestSubject and add it to the queue
 24:                 CollapsedRequestSubject<ResponseType, RequestArgumentType> collapsedRequest = new CollapsedRequestSubject<ResponseType, RequestArgumentType>(arg, this);
 25:                 final CollapsedRequestSubject<ResponseType, RequestArgumentType> existing = (CollapsedRequestSubject<ResponseType, RequestArgumentType>) argumentMap.putIfAbsent(arg, collapsedRequest);
 26:                 /** 27: * If the argument already exists in the batch, then there are 2 options: 28: * A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses 29: * be hooked up to that argument 30: * B) If request caching is OFF: return an error to all duplicate argument requests 31: * 32: * This maintains the invariant that each batch has no duplicate arguments. This prevents the impossible 33: * logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser) 34: * of trying to figure out which argument of a set of duplicates should get attached to a response. 35: * 36: * See https://github.com/Netflix/Hystrix/pull/1176 for further discussion. 37: */
 38:                 if(existing ! =null) {
 39:                     boolean requestCachingEnabled = properties.requestCacheEnabled().get();
 40:                     if (requestCachingEnabled) {
 41:                         return existing.toObservable();
 42:}else {
 43:                         return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "] This is not supported. Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!"));
 44:}45:}else {
 46:                     return collapsedRequest.toObservable();
 47:}48: 
 49:}50:}finally {
 51:             batchLock.readLock().unlock();
 52:}53:}else {
 54:         return null;
 55:}56:}Copy the code
  • Lines 4 through 6: Execution has started, add failed. In RequestBatch# executeBatchIfNotAlreadyStarted (…). At the beginning of the method, precedence CAS sets batchStarted = true.

  • Line 11: Get the read lock. The ‘read’ just means non-exclusive even though we are writing. The ‘read’ just means non-exclusive even though we are writing.

  • Lines 15 to 17: double-check, execution has started, add failed. In RequestBatch# executeBatchIfNotAlreadyStarted (…). BatchStarted = true before acquiring the write lock.

  • Line 20 to 21: Queue maximum length exceeded, add failed.

  • Line 24 to 25: create com.net flix. Hystrix. Collapser. CollapsedRequestSubject, and add it to the queue (argumentMap).

    • CollapsedRequestSubject com.net flix. Hystrix. HystrixCollapser. CollapsedRequest interface, defines the batch command execution requests, Not only to get the request parameters (#getArgument() method), but also to set the result of each request (#setResponse(…)) after the batch command is executed. /#emitResponse(…) /#setException(…) /#setComplete() method), click the link to see the code for this interface.

    • CollapsedRequestSubject constructor looks as follows:

      /* package */class CollapsedRequestSubject<T.R> implements CollapsedRequest<T.R> {
      
          /** * parameter */
          private final R argument;
      
          /** * result (response) Whether to set */
          private AtomicBoolean valueSet = new AtomicBoolean(false);
          /** * replayable ReplaySubject */
          private final ReplaySubject<T> subject = ReplaySubject.create();
          /** * ReplaySubject */
          private final Observable<T> subjectWithAccounting;
      
          /** * Number of subscriptions */
          private volatile int outstandingSubscriptions = 0;
      
          public CollapsedRequestSubject(final R arg, finalRequestBatch<? , T, R> containingBatch) {
              / / set the argument
              if (arg == RequestCollapser.NULL_SENTINEL) {
                  this.argument = null;
              } else {
                  this.argument = arg;
              }
              // Set ReplaySubject with the number of subscriptions
              this.subjectWithAccounting = subject
                      .doOnSubscribe(new Action0() {
                          @Override
                          public void call(a) {
                              outstandingSubscriptions++;
                          }
                      })
                      .doOnUnsubscribe(new Action0() {
                          @Override
                          public void call(a) {
                              outstandingSubscriptions--;
                              if (outstandingSubscriptions == 0) { containingBatch.remove(arg); }}}); }}Copy the code
      • argumentProperties,A singleCommand request parameters.
      • valueSetProperty, the result (Response) is set, yes#setResponse()/#emitResponse()Method setting.
      • subjectProperties,The execution result can be played backThe Subject. ReplaySubject is used here for the main purpose when HystrixCollapser is openedThe cacheFunction when executing results through playback in5. HystrixCachedObservableThere’s the same implementation. Also, one thing to note here is that ReplaySubject doesn’tThere is noSubscribe to any Observable,But by#setResponse()/#emitResponse()Method Setting result.
      • outstandingSubscriptionsProperty, number of subscriptions.
      • subjectWithAccountingProperty with the number of subscriptions ReplaySubject. Called when unsubscribingRequestBatch#remove(arg)Method, removeA singleCommand request.
  • Lines 38 to 47: Return Observable.

    • whenargumentMapexistingargThe cache must be enabled for the corresponding Observable.HystrixCollapserProperties.requestCachingEnabled = true) function. The reason is, if you’re inThe same arg, and caching is not enabled43 lineImplementation iscollapsedRequest.toObservable(), thenThe same argThere will bemultipleObservable executes the commandHystrixCollapserBridge#mapResponseToRequests(...)Method cannot assign execution (Response) toargThe corresponding command request (CollapsedRequestSubject) collapses. For more discussion, seeGithub.com/Netflix/Hys… 。
    • Looking backHystrixCollapser#toObservable()methodsLines 32 through 41There are also pairs hereThe cacheFunction, rightrepeat?argumentMapHystrixCollapser: RequestCollapser: RequestBatch yes1 : 1 : NThe relationship throughHystrixCollapser#toObservable()Cache processing logic to ensure that after the RequestBatch switchover,Still have cache.

The RequestBatch#remove() method looks like this:

/* package-private */ void remove(RequestArgumentType arg) {
    if (batchStarted.get()) {
        //nothing we can do
        return;
    }

    if (batchLock.readLock().tryLock()) {
        try {
            /* double-check now that we have the lock - if the batch is started, deleting is useless */
            if (batchStarted.get()) {
                return;
            }

            argumentMap.remove(arg);
        } finally{ batchLock.readLock().unlock(); }}}Copy the code
  • When RequestBatch starts executing, it is not allowed to remove a single command request.

4.4 # createNewBatchAndExecutePreviousIfNeeded (previousBatch)

In this section, you are advised to CollapserTimer again after “5.

# createNewBatchAndExecutePreviousIfNeeded (previousBatch) method, the code is as follows:

  1: private void createNewBatchAndExecutePreviousIfNeeded(RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> previousBatch) {
  2:     if (previousBatch == null) {
  3:         throw new IllegalStateException("Trying to start null batch which means it was shutdown already.");
  4:}5:     if (batch.compareAndSet(previousBatch, new RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>(properties, commandCollapser, properties.maxRequestsInBatch().get()))) {
  6:         // this thread won so trigger the previous batch
  7:         previousBatch.executeBatchIfNotAlreadyStarted();
  8:}9:}Copy the code
  • Line 5: PassCASModify thebatchTo ensure thread safety in the case of concurrency. Also notice that it’s done hereThe newRequestBatch, switchThe oldRequestBatch.
  • Line 6: UseThe oldRequestBatch, callRequestBatch#executeBatchIfNotAlreadyStarted()Method, command merge execution.

RequestBatch# executeBatchIfNotAlreadyStarted () method, the code is as follows:

  1: public void executeBatchIfNotAlreadyStarted(a) {
  2:     / * 3: * - check that we only execute once since there's multiple paths to do so (timer, waiting thread or max batch size hit) 4: * - close the gate so 'offer' can no longer be invoked and we turn those threads away so they create a new batch 5: */
  6:     // Setup execution has started
  7:     if (batchStarted.compareAndSet(false.true)) {
  8:         // Get the write lock
  9:         /* wait for 'offer'/'remove' threads to finish before executing the batch so 'requests' is complete */
 10:         batchLock.writeLock().lock();
 11: 
 12:         try {
 13:             // Split multiple command requests into N.
 14:             // shard batches
 15:             Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
 16:             // for each shard execute its requests 
 17:             for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
 18:                 try {
 19:                     // Merge multiple command requests to create a HystrixCommand
 20:                     // create a new command to handle this batch of requests
 21:                     Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);
 22: 
 23:                     // Map the execution results of a HystrixCommand back to the corresponding command requests
 24:                     commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {
 25: 
 26:                         /** 27: * This handles failed completions 28: */
 29:                         @Override
 30:                         public void call(Throwable e) {
 31:                             // handle Throwable in case anything is thrown so we don't block Observers waiting for onError/onCompleted
 32:                             Exception ee;
 33:                             if (e instanceof Exception) {
 34:                                 ee = (Exception) e;
 35:}else {
 36:                                 ee = new RuntimeException("Throwable caught while executing batch and mapping responses.", e);
 37:}38:                             logger.debug("Exception mapping responses to requests.", e);
 39:                             // if a failure occurs we want to pass that exception to all of the Futures that we've returned
 40:                             for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {
 41:                                 try {
 42:                                     ((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(ee);
 43:}catch (IllegalStateException e2) {
 44:                                     // if we have partial responses set in mapResponseToRequests
 45:                                     // then we may get IllegalStateException as we loop over them
 46:                                     // so we'll log but continue to the rest
 47:                                     logger.error("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception. Continuing ... ", e2);
 48:}49:}50:}51: 
 52:                     }).doOnCompleted(new Action0() {
 53: 
 54:                         /** 55: * This handles successful completions 56: */
 57:                         @Override
 58:                         public void call(a) {
 59:                             // check that all requests had setResponse or setException invoked in case 'mapResponseToRequests' was implemented poorly
 60:                             Exception e = null;
 61:                             for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) {
 62:                                 try {
 63:                                    e = ((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(e,"No response set by " + commandCollapser.getCollapserKey().name() + " 'mapResponseToRequests' implementation.");
 64:}catch (IllegalStateException e2) {
 65:                                     logger.debug("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting 'No response set' Exception.  Continuing ... ", e2);
 66:}67:}68:}69: 
 70:                     }).subscribe();
 71:                     
 72:}catch (Exception e) {
 73:                     / / exception
 74:                     logger.error("Exception while creating and queueing command with batch.", e);
 75:                     // if a failure occurs we want to pass that exception to all of the Futures that we've returned
 76:                     for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) {
 77:                         try {
 78:                             request.setException(e);
 79:}catch (IllegalStateException e2) {
 80:                             logger.debug("Failed trying to setException on CollapsedRequest", e2);
 81:}82:}83:}84:}85: 
 86:}catch (Exception e) {
 87:             / / exception
 88:             logger.error("Exception while sharding requests.", e);
 89:             // same error handling as we do around the shards, but this is a wider net in case the shardRequest method fails
 90:             for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {
 91:                 try {
 92:                     request.setException(e);
 93:}catch (IllegalStateException e2) {
 94:                     logger.debug("Failed trying to setException on CollapsedRequest", e2);
 95:}96:}97:}finally {
 98:             batchLock.writeLock().unlock();
 99:}100:}101:}Copy the code
  • Code seems to be a little long, please use the official sample CommandCollapserGetValueForKey watching, finishing, fat friends!
  • Line 7: PassCASModify thebatchStartedTo ensure thread safety in the case of concurrency.
  • Line 10: GetWrite lock. Waiting for the call#offer(...)/#remove(...)Method to ensure that no new requests are added or removed when the command merge is executed.
  • Line 15: CallHystrixCollapserBridge#shardRequests(...)Methods,multipleCommand requestshardintoNA 【multipleCommand request 】. By default, sharding is not performed. Click on thelinkLook at the code.
  • Line 17: Loop N [multiple command requests].
  • Line 21: CallHystrixCollapserBridge#createObservableCommand(...)Methods,multipleCommand requestmergeTo createaHystrixCommand. Click on thelinkLook at the code.
  • Line 24: CallHystrixCollapserBridge#mapResponseToRequests(...)Methods,aHystrixCommand execution result,mappingReturn the corresponding command requests. Click on thelinkLook at the code.
    • Observable#single()Method that returns that value if Observable terminates emitting only one value, otherwise throws an exception. inReactiveX Document Chinese Translation “Single”Have related share.
    • Observable#ignoreElements()Method to suppress all data emitted by the original Observable, allowing only its termination notifications (#onError()#onCompleted()Through). inReactiveX Document Chinese Translation “IgnoreElements”Have related share. Also recommended clickrx.internal.operators.OperatorIgnoreElementsTake a look at the source code, it might be easier to understand.
    • Observable#cast()Method casts every data emitted by the original Observable to a specified type, and then emits datamapA special version of the. inReactiveX 中文 版 “cast”Have related share. Also recommended clickrx.internal.operators.OperatorCastTake a look at the source code, it might be easier to understand.
    • useObservable#ignoreElements()/Observable#cast()Method, used to turn an Observable into an Observable that no longer shoots down data items, but into an existing methodObservable#doNext()Process data items, callHystrixCollapser#mapResponseToRequests(...)Methods.
    • Click on thelinkTo see theCollapsedRequestSubject#setResponse(response)Method code.
  • Lines 24 to 50: callsObservable#doError(Action1)Method when an exception occurs during command merge executioneachCollapsedRequestSubject is an exception.
    • Click on thelinkTo see theCollapsedRequestSubject#setResponse(response)Method code.
  • Lines 52 through 68: callsObservable#doOnCompleted(Action0)Method that checks when command merge execution is completeeachCollapsedRequestSubject all have a return result. Set the CollapsedRequestSubject that did not return a result to an exception. In general, it’s a user implementationHystrixCollapser#mapResponseToRequests(...)The method is buggy. In addition, if not set, will result in no resultA singleCommand requestInfinitely block.
  • Line 70: CallObservable#subscribe()Method,The triggerHystrixCommand execution.
  • Line 72 to 96: Exception occurred, seteachCollapsedRequestSubject is an exception.
    • Click on thelinkTo see theCollapsedRequestSubject#setException(response)Method code.
  • Lines 97 to 99: Release the write lock.

5. CollapserTimer

Com.net flix. Hystrix. Collapser CollapserTimer, command combiner timer interface, defines the submit time listener, generate timing task interface method, the code is as follows:

public interface CollapserTimer {

    Reference<TimerListener> addListener(TimerListener collapseTask);
}
Copy the code

5.1 RealCollapserTimer

Com.net flix. Hystrix. Collapser RealCollapserTimer, ordered combiner timer implementation class, the code is as follows:

public class RealCollapserTimer implements CollapserTimer {
    /* single global timer that all collapsers will schedule their tasks on */
    private final static HystrixTimer timer = HystrixTimer.getInstance();

    @Override
    public Reference<TimerListener> addListener(TimerListener collapseTask) {
        returntimer.addTimerListener(collapseTask); }}Copy the code
  • In fact, you use the singleton provided by HystrixTimer. Hystrix source Code Parsing — Execution Result Caching is explained in detail in “3. HystrixTimer”.

5.2 CollapsedTask

Com.net flix. Hystrix. Collapser. RequestCollapser. CollapsedTask, timing task, fixed cycle (can match, Default HystrixCollapserProperties. TimerDelayInMilliseconds = 10 ms) polling its corresponds to a current RequestBatch RequestCollapser. Submit the RequestCollapser merge if there are commands that need to be executed.

The code is relatively simple, click the link to see the code directly.

666. The eggs

T T at the beginning of the command merge execution, understood as similar to the thread pool batch execution task, how to see the official example, how strange. Have the same schoolmate, together tear eye + hold paw.

This article is a bit long and I really don’t want to break it up into multiple articles.

Well, the other part is not clear enough, welcome to discuss and optimize together.

Fat friends, share a wave of friends can be good!