This article is a supplement to the Subject section in Chapter 2 of The book Java Programming Methodology: Responsive Rxjava and Code Design in action.

Let’s start with a Demo:

@Test
    void replay_PublishSubject_test(a) {
        PublishSubject<Object> publishSubject = PublishSubject.create();
        ConnectableObservable<Object> replay = publishSubject.replay();
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        List<Integer> integers = new ArrayList<>();
        for (int i=1; i<10; i++){ integers.add(i); } Disposable subscribe1 = replay.subscribe(x -> { log(Ichiro: + x);
        }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));

        Disposable subscribe2 = replay.subscribe(x -> {
            log(Erlang God: + x);
        }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        Disposable subscribe3 = replay.subscribe(x -> {
            log("Saburo:" + x);
        }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        AtomicInteger atomicInteger = new AtomicInteger(integers.size());
        try {
            forkJoinPool.submit(() -> {
                integers.forEach(id -> {
                    sleep(1,TimeUnit.SECONDS);
                    publishSubject.onNext(id);
                    if (atomicInteger.decrementAndGet() == 0) { publishSubject.onComplete(); }}); }); replay.connect(); sleep(2,TimeUnit.SECONDS);
            subscribe1.dispose();
            sleep(1,TimeUnit.SECONDS);
            //replay.connect(consumer -> consumer.dispose());
            publishSubject.onComplete();
            System.out.println("test");

        } finally  {
            try {
                forkJoinPool.shutdown();
                int shutdownDelaySec = 2;
                System.out.println("........................... Waiting for" + shutdownDelaySec + "End service in seconds ……………… ");
                forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
            } catch (Exception ex) {
                System.out.println("Capture forkJoinPool. AwaitTermination () method of anomaly:" + ex.getClass().getName());
            } finally {
                System.out.println(Call ForkJoinPool.shutdownNow () to end the service...);
                List<Runnable> l = forkJoinPool.shutdownNow();
                System.out.println("Left" + l.size() + "Four tasks waiting to be executed, service shut down."); }}}Copy the code

The result is as follows:

ForkJoinPool.commonPool-worker-3Ichiro:1
ForkJoinPool.commonPool-worker-3Erlang God:1
ForkJoinPool.commonPool-worker-3Samuro:1
ForkJoinPool.commonPool-worker-3Erlang God:2
ForkJoinPool.commonPool-worker-3Samuro:2Emission Completed Emission completed test ……………… Waiting for the2End service in seconds ……………… Call ForkJoinPool.shutdownNow () to end the service... And then there were0Three tasks are waiting to be executed. The service is shut downCopy the code

When subscribe1.dispose() is called, the contract that the subscriber unsubscrires by himself is completed. But if replay.connect(consumer -> consumer.Dispose ()) is called later, it will still forcibly interrupt in the process of sending elements. Without notice. In using publishSubject. The onComplete (), can be very elegant to inform subsequent subscribers gracefully over. As shown in Figure 2-3, we operate according to the text in the figure and check the status at the break point of system.out.println (“test”). We find that the other two subscribers are not removed. Why is this situation?

With publishsubject.replay (), we get a ConnectableObservable as follows:

//io.reactivex.Observable#replay
public final ConnectableObservable<T> replay(a) {
    return ObservableReplay.createFrom(this);
}
Copy the code

Call replay.subscribe(…) , the downstream subscribers are associated with the UnboundedReplayBuffer obtained by DEFAULT_UNBOUNDED_FACTORY via a ReplayObserver object:

//ObservableReplay#createFrom
public static <T> ConnectableObservable<T> createFrom(ObservableSource<? extends T> source) {
    return create(source, DEFAULT_UNBOUNDED_FACTORY);
}
//ObservableReplay#create
static <T> ConnectableObservable<T> create(ObservableSource<T> source,
        final BufferSupplier<T> bufferFactory) {
    // the current connection to source needs to be shared between the operator and its onSubscribe call
    final AtomicReference<ReplayObserver<T>> curr = new AtomicReference<ReplayObserver<T>>();
    // Notice here
    ObservableSource<T> onSubscribe = new ReplaySource<T>(curr, bufferFactory);
    // Here the curr will be used as the value of the current field under ObservableReplay. Remember, it is a reference object
    return RxJavaPlugins.onAssembly(new ObservableReplay<T>(onSubscribe, source, curr, bufferFactory));
}
//ObservableReplay#subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
    onSubscribe.subscribe(observer);
}
//ObservableReplay.ReplaySource#subscribe
public void subscribe(Observer<? super T> child) {
        for (;;) {
            ReplayObserver<T> r = curr.get();
            if (r == null) {
                ReplayBuffer<T> buf = bufferFactory.call();
                ReplayObserver<T> u = new ReplayObserver<T>(buf);
                // The object to which the value of the current field in ObservableReplay points will also change
                if(! curr.compareAndSet(null, u)) {
                    continue;
                }
                r = u;
            }
            InnerDisposable<T> inner = new InnerDisposable<T>(r, child);
            child.onSubscribe(inner);
            // Manage downstream subscribers through the Observers field in ReplayObserver
            r.add(inner);
            if (inner.isDisposed()) {
                r.remove(inner);
                return;
            }
            // Here the UnboundedReplayBuffer object is associated with downstream subscribers
            r.buffer.replay(inner);
            break; }}}Copy the code

Dispose (consumer -> consumer.dispose()) when replay.connect(consumer -> consumer.dispose()) is called, And calls the dispose() method of the object (called replay.connect(…)). In observers, the observers field in ObservableReplay is set to TERMINATED, And set the ObservableReplay itself as the AtomicReference role to DISPOSED, that is, the current value of the ObservableReplay to DISPOSED.

//ObservableReplay#connect
public void connect(Consumer<? super Disposable> connection) {
    boolean doConnect;
    ReplayObserver<T> ps;
    for (;;) {
        ps = current.get();
        if (ps == null || ps.isDisposed()) {
            ReplayBuffer<T> buf = bufferFactory.call();
            ReplayObserver<T> u = new ReplayObserver<T>(buf);
            if(! current.compareAndSet(ps, u)) {continue; } ps = u; } doConnect = ! ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false.true);
        break; 
    }
    
    try {
        connection.accept(ps);
    } catch (Throwable ex) {
        if (doConnect) {
            ps.shouldConnect.compareAndSet(true.false);
        }
        Exceptions.throwIfFatal(ex);
        throw ExceptionHelper.wrapOrThrow(ex);
    }
    if(doConnect) { source.subscribe(ps); }}//ObservableReplay.ReplayObserver#dispose
public void dispose(a) {
    observers.set(TERMINATED);
    DisposableHelper.dispose(this);
}
//DisposableHelper#dispose
public static boolean dispose(AtomicReference<Disposable> field) {
    Disposable current = field.get();
    Disposable d = DISPOSED;
    if(current ! = d) { current = field.getAndSet(d);if(current ! = d) {if(current ! =null) {
                current.dispose();
            }
            return true; }}return false;
}
Copy the code

As you can see, the ReplayObserver disconnects the downstream subscriber, but does not terminate the downstream subscriber further, so that the subscriber associated with the UnboundedReplayBuffer object, if the elements in the buffer have not been consumed, Consumption would continue until at least the stored elements distributed, but note that is not stored in the buffer end events (i.e., by calling the UnboundedReplayBuffer# complete to the queue to store NotificationLite.com plete () element). Also the downstream subscriber did not invoke the Dispose () method, so output.isdisposed () in the source code shown below is false. Notice the code at <1> in the source code shown below:

public void replay(InnerDisposable<T> output) {
        if(output.getAndIncrement() ! =0) {
            return;
        }

        final Observer<? super T> child = output.child;

        int missed = 1;

        for (;;) {
            if (output.isDisposed()) {
                return;
            }
            int sourceIndex = size;

            Integer destinationIndexObject = output.index();
            intdestinationIndex = destinationIndexObject ! =null ? destinationIndexObject : 0;

            while (destinationIndex < sourceIndex) {
                Object o = get(destinationIndex);
                // This is crucial
                if (NotificationLite.accept(o, child)) {/ / < 1 >
                    return;
                }
                if (output.isDisposed()) {
                    return;
                }
                destinationIndex++;
            }

            output.index = destinationIndex;
            missed = output.addAndGet(-missed);
            if (missed == 0) {
                break; }}}}//io.reactivex.internal.util.NotificationLite#accept
public static <T> boolean accept(Object o, Observer<? super T> s) {
    if (o == COMPLETE) {
        s.onComplete();
        return true;
    } else
    if (o instanceof ErrorNotification) {
        s.onError(((ErrorNotification)o).e);
        return true;
    }
    s.onNext((T)o);
    return false;
}
Copy the code

If UnboundedReplayBuffer#complete is called, o == complete is true when the element is delivered to the end, and the downstream subscriber’s onComplete() method is called.

//ObservableReplay.UnboundedReplayBuffer#complete
public void onComplete(a) {
    if(! done) { done =true; buffer.complete(); replayFinal(); }}//ObservableReplay.UnboundedReplayBuffer#complete
public void complete(a) {
    add(NotificationLite.complete());
    size++;
}
//io.reactivex.internal.util.NotificationLite#complete
public static Object complete(a) {
    return COMPLETE;
}
Copy the code

At this point, the puzzle shown in the replay_PublishSubject_test() example has been interpreted.