sequence

This article focuses on Hystrix’s BucketedCounterStream

BucketedCounterStream

Hystrix – core – 1.5.12 – sources jar! /com/netflix/hystrix/metric/consumer/BucketedCounterStream.java

/**
 * Abstract class that imposes a bucketing structure and provides streams of buckets
 *
 * @param <Event> type of raw data that needs to get summarized into a bucket
 * @param <Bucket> type of data contained in each bucket
 * @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be)
 */
public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {
    protected final int numBuckets;
    protected final Observable<Bucket> bucketedStream;
    protected final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>(null);

    private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary;

    private final BehaviorSubject<Output> counterSubject = BehaviorSubject.create(getEmptyOutputValue());

    protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
                                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
        this.numBuckets = numBuckets;
        this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call(Observable<Event> eventBucket) {
                returneventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket); }}; final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();for (int i = 0; i < numBuckets; i++) {
            emptyEventCountsToStart.add(getEmptyBucketSummary());
        }

        this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call() {
                return inputEventStream
                        .observe()
                        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
                        .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
                        .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
            }
        });
    }

    abstract Bucket getEmptyBucketSummary();

    abstract Output getEmptyOutputValue();

    /**
     * Return the stream of buckets
     * @return stream of buckets
     */
    public abstract Observable<Output> observe();

    public void startCachingStreamValuesIfUnstarted() {
        if (subscription.get() == null) {
            //the stream is not yet started
            Subscription candidateSubscription = observe().subscribe(counterSubject);
            if (subscription.compareAndSet(null, candidateSubscription)) {
                //won the race to set the subscription
            } else {
                //lost the race to set the subscription, so we need to cancel this one
                candidateSubscription.unsubscribe();
            }
        }
    }

    /**
     * Synchronous call to retrieve the last calculated bucket without waiting for any emissions
     * @return last calculated bucket
     */
    public Output getLatest() {
        startCachingStreamValuesIfUnstarted();
        if (counterSubject.hasValue()) {
            return counterSubject.getValue();
        } else {
            return getEmptyOutputValue();
        }
    }

    public void unsubscribe() {
        Subscription s = subscription.get();
        if(s ! = null) { s.unsubscribe(); subscription.compareAndSet(s, null); }}}Copy the code
  • The constructor initializes the bucketedStream by observing HystrixEventStream, window, and flatMap
  • The timespan parameter for the window operation is bucketSizeInMs, which is calculated as follows
        final int counterMetricWindow = properties.metricsRollingStatisticalWindowInMilliseconds().get();
        final int numCounterBuckets = properties.metricsRollingStatisticalWindowBuckets().get();
        final int counterBucketSizeInMs = counterMetricWindow / numCounterBuckets;
Copy the code
  • BucketedCounterStream has two subclasses of directly, also is the abstract class, is BucketedRollingCounterStream and BucketedCumulativeCounterStream respectively

BucketedRollingCounterStream

Hystrix – core – 1.5.12 – sources jar! /com/netflix/hystrix/metric/consumer/BucketedRollingCounterStream.java

/**
 * Refinement of {@link BucketedCounterStream} which reduces numBuckets at a time.
 *
 * @param <Event> type of raw data that needs to get summarized into a bucket
 * @param <Bucket> type of data contained in each bucket
 * @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be)
 */
public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
    private Observable<Output> sourceStream;
    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

    protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                                           final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                                           final Func2<Output, Bucket, Output> reduceBucket) {
        super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
        Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
            @Override
            public Observable<Output> call(Observable<Bucket> window) {
                returnwindow.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); }}; this.sourceStream = bucketedStream //stream broken up into buckets .window(numBuckets, 1) //emit overlapping windows of buckets .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary .doOnSubscribe(newAction0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(true);
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(false);
                    }
                })
                .share()                        //multiple subscribers should get same data
                .onBackpressureDrop();          //if there are slow consumers, data should not buffer
    }

    @Override
    public Observable<Output> observe() {
        return sourceStream;
    }

    /* package-private */ boolean isSourceCurrentlySubscribed() {
        returnisSourceCurrentlySubscribed.get(); }}Copy the code
  • BucketedStream based on the parent class defines sourceStream for Observe and does window and flatMap processing for bucketedStream
  • The count and skip parameters are used in the window operation. The count parameter is numBuckets, and the skip parameter is 1

BucketedCumulativeCounterStream

Hystrix – core – 1.5.12 – sources jar! /com/netflix/hystrix/metric/consumer/BucketedCumulativeCounterStream.java

/**
 * Refinement of {@link BucketedCounterStream} which accumulates counters infinitely in the bucket-reduction step
 *
 * @param <Event> type of raw data that needs to get summarized into a bucket
 * @param <Bucket> type of data contained in each bucket
 * @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be)
 */
public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
    private Observable<Output> sourceStream;
    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

    protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                                              Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                                              Func2<Output, Bucket, Output> reduceBucket) {
        super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);

        this.sourceStream = bucketedStream
                .scan(getEmptyOutputValue(), reduceBucket)
                .skip(numBuckets)
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(true);
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(false);
                    }
                })
                .share()                        //multiple subscribers should get same data
                .onBackpressureDrop();          //if there are slow consumers, data should not buffer
    }

    @Override
    public Observable<Output> observe() {
        return sourceStream; }}Copy the code
  • BucketedStream based on the parent class defines sourceStream for Observe, scan and skip bucketedStream
  • The difference between Scan and Reduce is that consumers are notified after scan is completed, while Reduce notifies consumers after scan is completed in one go
  • Here, the scan parameter is getEmptyOutputValue(), the empty array is used for accumulation, and the skip value is numBuckets

summary

  • Hystrix BucketedCounterStream has two direct subclasses, BucketedRollingCounterStream and BucketedCumulativeCounterStream
  • BucketedRollingCounterStream,’s window and flatMap operation, through the Windows here to achieve the effect of rolling, the skip parameters expressed native sequence, the start element spacing is how many, such as the skip is 3, The first batch of Windows is [1,2,3,4,5] and the second batch is [4,5,6,7,8].
  • BucketedCumulativeCounterStream, adopt the scan and the skip operation, the cumulative effect is done by scan function, and then through the skip operation dropped the first numBuckets data.

Rolling and Cumulative are realized by window and SCAN operations of RXJava, which seem relatively simple.

doc

  • rxdocs-scan
  • rxdocs-skip
  • Rxjava Scan is different from Reduce