RxJava2 FlatMap and ConcatMap operators RxJava2 FlatMap and ConcatMap operators RxJava2 FlatMap and ConcatMap operators

RxJava2 source analysis – subscribe

RxJava2 source code analysis – thread switching

RxJava2 source analysis – Map operator

The RxJava and RxAndroid versions used in this article are as follows:

implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.2.6'
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.1'
Copy the code

FlatMap

The FlatMap operator can transform an Observables emitting data into multiple Observables, and then merge the emitted data into a single Observable without ensuring that the emitted data is in order.

Let’s start with some sample code, so that I don’t have to call the Lambda and the chain when I call the FlatMap method. It looks like this:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan:");
    emitter.onNext("Jia:");
    emitter.onNext(Jun: "");
    emitter.onComplete();
})
        .flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) {
                List<String> list = new ArrayList<>();

                for (int i = 0; i < 3; i++) {
                    list.add(s + i);
                }
                return Observable.fromIterable(list);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // no implementation
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", s);
            }

            @Override
            public void onError(Throwable e) {
                // no implementation
            }

            @Override
            public void onComplete(a) {
                // no implementation}});Copy the code

The Log is as follows:

Source code analysis

Let’s take a look at the flatMap method. The analysis shows that the following methods will be called in sequence, with the following code:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    // Note that the delayErrors argument is passed false
    return flatMap(mapper, false);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
    The maxConcurrency parameter is passed in as integer. MAX_VALUE
    return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    // bufferSize is the size of the data buffer, related to Backpressure
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    // Check whether this is an implementation class of ScalarCallable
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    // The following method is called if it is not ScalarCallable's implementation class
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
Copy the code

BufferSize () is the size of the data buffer, which defaults to 128, as shown in the following code:

// Observable.java
public static int bufferSize(a) {
    return Flowable.bufferSize();
}

// Flowable.java
public static int bufferSize(a) {
    return BUFFER_SIZE;
}

// Flowable.java
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
    BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size".128));
}
Copy the code

ScalarCallable is an interface, and its implementation class has 6: FlowableEmpty, FlowableJust, MaybeEmpty, MaybeJust, ObservableEmpty, ObservableJust, respectively corresponding to these 6 methods: Flowable.empty(), flowable.just (T item), Maybe.empty(), Maybe.just(T item), Observable.empty(), Observable.just(T item).

Based on our experience in the last few articles, we just need to look at the ObservableFlatMap class, which looks like this:

// ObservableFlatMap.java
public ObservableFlatMap(ObservableSource<T> source,
        Function<? super T, ? extends ObservableSource<? extends U>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    super(source);
    this.mapper = mapper;
    this.delayErrors = delayErrors;
    this.maxConcurrency = maxConcurrency;
    this.bufferSize = bufferSize;
}

@Override
public void subscribeActual(Observer<? super U> t) {

    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }

    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
Copy the code

We also like before a few articles, see subscribeActual method first, here will call ObservableScalarXMap. TryScalarXMapSubscribe method, if it is true, you can return, The ScalarCallable ScalarCallable ScalarCallable ScalarCallable ScalarCallable ScalarCallable ScalarCallable ScalarCallable ScalarCallable ScalarCallable ScalarCallable ScalarCallable ScalarCallable ScalarCallable The MergeObserver class is called with a new subscribe method. The MergeObserver class is called with a new subscribe method. The MergeObserver class is called with a new subscribe method.

// ObservableFlatMap.java
// MergeObserver extends AtomicInteger
static final class MergeObserver<T.U> extends AtomicInteger implements Disposable.Observer<T> {

    private static final long serialVersionUID = -2117620485640801370L;

    final Observer<? super U> downstream;
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final boolean delayErrors;
    final int maxConcurrency;
    final int bufferSize;

    volatile SimplePlainQueue<U> queue;

    volatile boolean done;

    final AtomicThrowable errors = new AtomicThrowable();

    volatile boolean cancelled;

    // Store the InnerObserver array
    finalAtomicReference<InnerObserver<? ,? >[]> observers;static finalInnerObserver<? ,? >[] EMPTY =newInnerObserver<? ,? > [0];

    static finalInnerObserver<? ,? >[] CANCELLED =newInnerObserver<? ,? > [0];

    Disposable upstream;

    long uniqueId;
    long lastId;
    int lastIndex;

    Queue<ObservableSource<? extends U>> sources;

    int wip;

    MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        this.downstream = actual;
        // Mapper is an implementation class for the Function interface
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
        // Integer.MAX_VALUE is passed in, so this logic does not
        if(maxConcurrency ! = Integer.MAX_VALUE) { sources =new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
        }
        // Create an atomic reference to the InnerObserver array
        this.observers = newAtomicReference<InnerObserver<? ,? >[]>(EMPTY); }@Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            downstream.onSubscribe(this); }}@Override
    public void onNext(T t) {
        // safeguard against misbehaving sources
        if (done) {
            return;
        }
        ObservableSource<? extends U> p;
        try {
            // Call mapper's apply method
            p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            upstream.dispose();
            onError(e);
            return;
        }

        // This logic is not executed because integer.max_value is passed in
        if(maxConcurrency ! = Integer.MAX_VALUE) {synchronized (this) {
                if (wip == maxConcurrency) {
                    sources.offer(p);
                    return;
                }
                wip++;
            }
        }

        subscribeInner(p);
    }

    @SuppressWarnings("unchecked")
    void subscribeInner(ObservableSource<? extends U> p) {
        // An infinite loop
        for (;;) {
            // Determine whether p is the implementation class of the Callable interface
            if (p instanceof Callable) {
                if(tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency ! = Integer.MAX_VALUE) {boolean empty = false;
                    synchronized (this) {
                        p = sources.poll();
                        if (p == null) {
                            wip--;
                            empty = true; }}if (empty) {
                        drain();
                        break; }}else {
                    break; }}else {
                // Create InnerObserver if p is not the implementation class of the Callable interface
                InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                // Call the addInner method, placing the InnerObserver in this array, which is parsed below
                if (addInner(inner)) {
                    // Subscribe to the InnerObserver each time you create it
                    p.subscribe(inner);
                }
                break; }}}boolean addInner(InnerObserver<T, U> inner) {
        // Another endless loop
        for (;;) {
            // Get InnerObserver from observersInnerObserver<? ,? >[] a = observers.get();if (a == CANCELLED) {
                // Cancel the subscription if CANCELLED
                inner.dispose();
                return false;
            }
            int n = a.length;
            // Create a new InnerObserver array with the size of array A plus 1InnerObserver<? ,? >[] b =new InnerObserver[n + 1];
            // Copy array A data to array B
            System.arraycopy(a, 0, b, 0, n);
            // Place the new InnerObserver at the end of the B array
            b[n] = inner;
            // Update array B data atomically into array A
            if (observers.compareAndSet(a, b)) {
                // Returns true on success
                return true; }}}// Remove the InnerObserver method
    void removeInner(InnerObserver<T, U> inner) {
        for(;;) { InnerObserver<? ,? >[] a = observers.get();int n = a.length;
            if (n == 0) {
                return;
            }
            int j = -1;
            for (int i = 0; i < n; i++) {
                if (a[i] == inner) {
                    j = i;
                    break; }}if (j < 0) {
                return; } InnerObserver<? ,? >[] b;if (n == 1) {
                b = EMPTY;
            } else {
                b = newInnerObserver<? ,? >[n -1];
                System.arraycopy(a, 0, b, 0, j);
                System.arraycopy(a, j + 1, b, j, n - j - 1);
            }
            if (observers.compareAndSet(a, b)) {
                return; }}}boolean tryEmitScalar(Callable<? extends U> value) {
        U u;
        try {
            u = value.call();
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            errors.addThrowable(ex);
            drain();
            return true;
        }

        if (u == null) {
            return true;
        }

        if (get() == 0 && compareAndSet(0.1)) {
            downstream.onNext(u);
            if (decrementAndGet() == 0) {
                return true; }}else {
            SimplePlainQueue<U> q = queue;
            if (q == null) {
                if (maxConcurrency == Integer.MAX_VALUE) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                } else {
                    q = new SpscArrayQueue<U>(maxConcurrency);
                }
                queue = q;
            }

            if(! q.offer(u)) { onError(new IllegalStateException("Scalar queue full? !"));
                return true;
            }
            if(getAndIncrement() ! =0) {
                return false;
            }
        }
        drainLoop();
        return true;
    }

    void tryEmit(U value, InnerObserver<T, U> inner) {
        // Check if get() is equal to 0, if it is equal to 0 then set the value to 1
        if (get() == 0 && compareAndSet(0.1)) {
            // Call the downstream onNext method
            downstream.onNext(value);
            // After transmitting data, determine whether the value after subtracting 1 is equal to 0, if = 0, prove that all data has been transmitted, the method ends
            if (decrementAndGet() == 0) {
                return; }}else {
            SimpleQueue<U> q = inner.queue;
            if (q == null) {
                // Create SpscLinkedArrayQueue, which is a single-production, single-consumption array queue that can allocate new arrays if consumers are slow
                q = new SpscLinkedArrayQueue<U>(bufferSize);
                inner.queue = q;
            }
            // Cache the received upstream data to the queue
            q.offer(value);
            // Check whether the value is not equal to 0 and increment by 1, if not equal to 0, end the method
            if(getAndIncrement() ! =0) {
                return; }}// Call the drainLoop method
        drainLoop();
    }

    @Override
    public void onError(Throwable t) {
        if (done) {
            RxJavaPlugins.onError(t);
            return;
        }
        if (errors.addThrowable(t)) {
            done = true;
            drain();
        } else{ RxJavaPlugins.onError(t); }}@Override
    public void onComplete(a) {
        if (done) {
            return;
        }
        done = true;
        drain();
    }

    @Override
    public void dispose(a) {
        if(! cancelled) { cancelled =true;
            if (disposeAll()) {
                Throwable ex = errors.terminate();
                if(ex ! =null&& ex ! = ExceptionHelper.TERMINATED) { RxJavaPlugins.onError(ex); }}}}@Override
    public boolean isDisposed(a) {
        return cancelled;
    }

    void drain(a) {
        if (getAndIncrement() == 0) { drainLoop(); }}void drainLoop(a) {
        final Observer<? super U> child = this.downstream;
        int missed = 1;
        for (;;) {
            // Check if the subscription is terminated, if so, the method ends
            if (checkTerminate()) {
                return;
            }
            // Copy the MergeObserver variable queue to SVQ. Queue is a queue
            SimplePlainQueue<U> svq = queue;

            if(svq ! =null) {
                for (;;) {
                    // Check again if the subscription has been terminated, if so, the method ends
                    if (checkTerminate()) {
                        return;
                    }

                    // Fetch data from the queue
                    U o = svq.poll();

                    // If null, break the loop
                    if (o == null) {
                        break;
                    }

                    // Call the onNext method of the downstream Observer to emit datachild.onNext(o); }}booleand = done; svq = queue; InnerObserver<? ,? >[] inner = observers.get();int n = inner.length;

            int nSources = 0;
            if(maxConcurrency ! = Integer.MAX_VALUE) {synchronized (this) { nSources = sources.size(); }}if (d && (svq == null || svq.isEmpty()) && n == 0 && nSources == 0) {
                Throwable ex = errors.terminate();
                if(ex ! = ExceptionHelper.TERMINATED) {// Check whether the Throwable is null
                    if (ex == null) {
                        // Call the onComplete method of the downstream Observer
                        child.onComplete();
                    } else {
                        // Call the onError method of the downstream Observerchild.onError(ex); }}return;
            }

            // Handle array data
            int innerCompleted = 0;
            if(n ! =0) {
                long startId = lastId;
                int index = lastIndex;

                if(n <= index || inner[index].id ! = startId) {if (n <= index) {
                        index = 0;
                    }
                    int j = index;
                    for (int i = 0; i < n; i++) {
                        if (inner[j].id == startId) {
                            break;
                        }
                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    index = j;
                    lastIndex = j;
                    lastId = inner[j].id;
                }

                int j = index;
                sourceLoop:
                for (int i = 0; i < n; i++) {
                    if (checkTerminate()) {
                        return;
                    }

                    @SuppressWarnings("unchecked")
                    InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
                    SimpleQueue<U> q = is.queue;
                    if(q ! =null) {
                        // Process each InnerObserver object in the InnerObserver array
                        for (;;) {
                            U o;
                            try {
                                o = q.poll();
                            } catch (Throwable ex) {
                                Exceptions.throwIfFatal(ex);
                                is.dispose();
                                errors.addThrowable(ex);
                                if (checkTerminate()) {
                                    return;
                                }
                                removeInner(is);
                                innerCompleted++;
                                j++;
                                if (j == n) {
                                    j = 0;
                                }
                                continue sourceLoop;
                            }
                            if (o == null) {
                                break;
                            }

                            // Call the onNext method to emit InnerObserver data
                            child.onNext(o);

                            // Check if the subscription is terminated, if so, the method ends
                            if (checkTerminate()) {
                                return; }}}boolean innerDone = is.done;
                    SimpleQueue<U> innerQueue = is.queue;
                    // Check whether the data in the queue is processed
                    if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                        // If so, remove the corresponding InnerObserver from the array
                        removeInner(is);
                        // Check if the subscription is terminated, if so, the method ends
                        if (checkTerminate()) {
                            return;
                        }
                        / / innerCompleted since
                        innerCompleted++;
                    }

                    j++;
                    if (j == n) {
                        j = 0;
                    }
                }
                lastIndex = j;
                lastId = inner[j].id;
            }

            // Determine if the innerCompleted is not equal to 0, or if the InnerObserver is completed
            if(innerCompleted ! =0) {
                if(maxConcurrency ! = Integer.MAX_VALUE) {while(innerCompleted-- ! =0) {
                        ObservableSource<? extends U> p;
                        synchronized (this) {
                            p = sources.poll();
                            if (p == null) {
                                wip--;
                                continue; } } subscribeInner(p); }}// Completes the current loop and enters the next loop, continuing with the next InnerObserver
                continue;
            }
            // After the data is transmitted, the value is automatically reduced
            missed = addAndGet(-missed);
            // If missed equals 0, all data in the queue has been transmitted
            if (missed == 0) {
                break; }}}// Check if the subscription is terminated
    boolean checkTerminate(a) {
        if (cancelled) {
            return true;
        }
        Throwable e = errors.get();
        if(! delayErrors && (e ! =null)) {
            disposeAll();
            e = errors.terminate();
            if(e ! = ExceptionHelper.TERMINATED) { downstream.onError(e); }return true;
        }
        return false;
    }

    boolean disposeAll(a) { upstream.dispose(); InnerObserver<? ,? >[] a = observers.get();if(a ! = CANCELLED) { a = observers.getAndSet(CANCELLED);if(a ! = CANCELLED) {for(InnerObserver<? ,? > inner : a) { inner.dispose(); }return true; }}return false; }}// InnerObserver inherits AtomicReference
static final class InnerObserver<T.U> extends AtomicReference<Disposable>
implements Observer<U> {

    private static final long serialVersionUID = -4606175640614850599L;
    final long id;
    final MergeObserver<T, U> parent;

    volatile boolean done;
    volatile SimpleQueue<U> queue;

    int fusionMode;

    InnerObserver(MergeObserver<T, U> parent, long id) {
        this.id = id;
        this.parent = parent;
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            if (d instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<U> qd = (QueueDisposable<U>) d;

                // requestFusion is related to Backpressure, because we don't use the related class here, so fusionMode value is 0
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                // The subscription type is synchronous
                if (m == QueueDisposable.SYNC) {
                    fusionMode = m;
                    queue = qd;
                    done = true;
                    // Launch data
                    parent.drain();
                    return;
                }
                // Subscription type is asynchronous
                if(m == QueueDisposable.ASYNC) { fusionMode = m; queue = qd; }}}}@Override
    public void onNext(U t) {
        QueueDisposable.NONE = QueueDisposable.NONE = QueueDisposable
        if (fusionMode == QueueDisposable.NONE) {
            Call the tryEmit method
            parent.tryEmit(t, this);
        } else{ parent.drain(); }}@Override
    public void onError(Throwable t) {
        if (parent.errors.addThrowable(t)) {
            if(! parent.delayErrors) { parent.disposeAll(); } done =true;
            parent.drain();
        } else{ RxJavaPlugins.onError(t); }}@Override
    public void onComplete(a) {
        done = true;
        parent.drain();
    }

    public void dispose(a) {
        DisposableHelper.dispose(this); }}Copy the code

ConcatMap

The ConcatMap operator converts one Observables that emit data into multiple Observables, and then merges the emitted data into a single Observable, keeping the emitted data in order.

Let’s write some sample code to make it easier to understand. Instead of calling the ConcatMap method with a Lambda and a chain call, I’ll use the following code:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan:");
    emitter.onNext("Jia:");
    emitter.onNext(Jun: "");
    emitter.onComplete();
})
        .concatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) {
                List<String> list = new ArrayList<>();

                for (int i = 0; i < 3; i++) {
                    list.add(s + i);
                }
                return Observable.fromIterable(list);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // no implementation
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", s);
            }

            @Override
            public void onError(Throwable e) {
                // no implementation
            }

            @Override
            public void onComplete(a) {
                // no implementation}});Copy the code

The Log is as follows:

Source code analysis

The ConcatMap method calls the following methods in sequence:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return concatMap(mapper, 2);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(prefetch, "prefetch");
    // Check whether this is ScalarCallable
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    // The last parameter delayErrors is passed errormode.immediate
    return RxJavaPlugins.onAssembly(new ObservableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
}
Copy the code

ObservableConcatMap (ObservableConcatMap, ObservableConcatMap, ObservableConcatMap)

public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
        int bufferSize, ErrorMode delayErrors) {
    super(source);
    this.mapper = mapper;
    this.delayErrors = delayErrors;
    this.bufferSize = Math.max(8, bufferSize);
}

@Override
public void subscribeActual(Observer<? super U> observer) {

    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, observer, mapper)) {
        return;
    }

    // delayErrors is passed errormode. IMMEDIATE
    if (delayErrors == ErrorMode.IMMEDIATE) {
        // Serialize the observer
        SerializedObserver<U> serial = new SerializedObserver<U>(observer);
        // Call the subscription method and pass in the SourceObserver as new
        source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
    } else {
        source.subscribe(newConcatMapDelayErrorObserver<T, U>(observer, mapper, bufferSize, delayErrors == ErrorMode.END)); }}Copy the code

Let’s take a look at SourceObserver. Some of the source code is similar to FlatMap, but I won’t repeat it here:

// ObservableConcatMap.java
static final class SourceObserver<T.U> extends AtomicInteger implements Observer<T>, Disposable {

    private static final long serialVersionUID = 8828587559905699186L;
    final Observer<? super U> downstream;
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final InnerObserver<U> inner;
    final int bufferSize;

    SimpleQueue<T> queue;

    Disposable upstream;

    volatile boolean active;

    volatile boolean disposed;

    volatile boolean done;

    int fusionMode;

    SourceObserver(Observer<? super U> actual,
                            Function<? super T, ? extends ObservableSource<? extends U>> mapper, int bufferSize) {
        this.downstream = actual;
        this.mapper = mapper;
        this.bufferSize = bufferSize;
        this.inner = new InnerObserver<U>(actual, this);
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            if (d instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<T> qd = (QueueDisposable<T>) d;

                // requestFusion is related to Backpressure, because we don't use the related class here, so fusionMode value is 0
                int m = qd.requestFusion(QueueDisposable.ANY);
                // The subscription is synchronous
                if (m == QueueDisposable.SYNC) {
                    fusionMode = m;
                    queue = qd;
                    done = true;

                    downstream.onSubscribe(this);

                    drain();
                    return;
                }

                // Subscriptions are asynchronous
                if (m == QueueDisposable.ASYNC) {
                    fusionMode = m;
                    queue = qd;

                    downstream.onSubscribe(this);

                    return; }}// Create a queue the size of the data buffer
            queue = new SpscLinkedArrayQueue<T>(bufferSize);

            // Call the onSubscribe method of the downstream Observer
            downstream.onSubscribe(this); }}@Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        QueueDisposable.NONE = QueueDisposable.NONE = QueueDisposable
        if (fusionMode == QueueDisposable.NONE) {
            // Cache the received upstream data to the queue
            queue.offer(t);
        }
        // Call the drain method
        drain();
    }

    @Override
    public void onError(Throwable t) {
        if (done) {
            RxJavaPlugins.onError(t);
            return;
        }
        done = true;
        dispose();
        downstream.onError(t);
    }

    @Override
    public void onComplete(a) {
        if (done) {
            return;
        }
        done = true;
        drain();
    }

    void innerComplete(a) {
        active = false;
        drain();
    }

    @Override
    public boolean isDisposed(a) {
        return disposed;
    }

    @Override
    public void dispose(a) {
        disposed = true;
        inner.dispose();
        upstream.dispose();

        if (getAndIncrement() == 0) { queue.clear(); }}void drain(a) {
        // Check whether the value is not equal to 0 and increment by 1, if not equal to 0, end the method
        if(getAndIncrement() ! =0) {
            return;
        }

        for (;;) {
            // Decide whether to end the subscription
            if (disposed) {
                // If so, empty the queue and end the method
                queue.clear();
                return;
            }
            Active is volatile. Active is used to determine whether the InnerObserver is currently firing, and therefore ensures that the firing InnerObserver is ordered, unlike FlatMap
            if(! active) {boolean d = done;

                T t;

                try {
                    // Fetch data from the queue
                    t = queue.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    dispose();
                    queue.clear();
                    downstream.onError(ex);
                    return;
                }

                boolean empty = t == null;

                // Determine whether the launch is complete and whether there is data in the queue
                if (d && empty) {
                    // If the launch is complete and the queue is empty, end the subscription and call onComplete in the downstream Observer
                    disposed = true;
                    downstream.onComplete();
                    return;
                }

                // Check again if there is any data in the queue
                if(! empty) {If there is still data in the queue, execute the following logic
                    ObservableSource<? extends U> o;

                    try {
                        // Call mapper's apply method
                        o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        dispose();
                        queue.clear();
                        downstream.onError(ex);
                        return;
                    }

                    // Active is set to true, which means that the data is still being transmitted, and other missions cannot enter the above judgment
                    active = true;
                    // Invoke the subscription method of the downstream Observero.subscribe(inner); }}// After transmitting data, determine whether the value after subtracting 1 is equal to 0, if = 0, prove that all data has been transmitted, the method ends
            if (decrementAndGet() == 0) {
                break; }}}// InnerObserver inherits AtomicReference
    static final class InnerObserver<U> extends AtomicReference<Disposable> implements Observer<U> {

        private static final long serialVersionUID = -7449079488798789337L;

        final Observer<? super U> downstream;
        finalSourceObserver<? ,? > parent; InnerObserver(Observer<?superU> actual, SourceObserver<? ,? > parent) {this.downstream = actual;
            this.parent = parent;
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onNext(U t) {
            // Call the onNext method of the downstream Observer
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            parent.dispose();
            downstream.onError(t);
        }

        @Override
        public void onComplete(a) {
            parent.innerComplete();
        }

        void dispose(a) {
            DisposableHelper.dispose(this); }}}Copy the code

FlatMap and ConcatMap are compared

Before making the comparison, I change the above two sample code, both call delay method, delay the launch for 1s, the code is as follows:

FlatMap:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan:");
    emitter.onNext("Jia:");
    emitter.onNext(Jun: "");
    emitter.onComplete();
})
        .flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) {
                List<String> list = new ArrayList<>();

                for (int i = 0; i < 3; i++) {
                    list.add(s + i);
                }
                return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // no implementation
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", s);
            }

            @Override
            public void onError(Throwable e) {
                // no implementation
            }

            @Override
            public void onComplete(a) {
                // no implementation}});Copy the code

The Log is as follows:

ConcatMap:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan:");
    emitter.onNext("Jia:");
    emitter.onNext(Jun: "");
    emitter.onComplete();
})
        .concatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) {
                List<String> list = new ArrayList<>();

                for (int i = 0; i < 3; i++) {
                    list.add(s + i);
                }
                return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // no implementation
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", s);
            }

            @Override
            public void onError(Throwable e) {
                // no implementation
            }

            @Override
            public void onComplete(a) {
                // no implementation}});Copy the code

The Log is as follows:

I have sent three groups of data here. It should be noted that we found that FlatMap was not in order, but the data transmitted in each group was in order. ConcatMap was in order, and the data transmitted in each group was also in order. FlatMap (MergeObserver) and ConcatMap (SourceObserver) both inherit AtomicInteger. Before explaining this class, let’s talk about a few concepts.

volatile

Semantics of volatile:

  1. Operations on volatile variables are not guaranteed to be atomic.
  2. The Java memory model does not reorder volatile instructions and ensures that operations on volatile variables are executed in the order of the instructions.
  3. Volatile variables are visible to all threads, each change is immediately synchronized back to main memory, and each read is re-read from main memory.

Instruction reordering

The processor can reduce the cost of memory latency by an order of magnitude with caches, because an access to main memory takes hardware many clock cycles, and these caches reorder the order of pending memory operations for performance, known as reordering. ** The Java Memory Model ensures sequential execution semantics through the happen-before principle, whereby writes to a volatile variable occur first before reads to that variable occur later. Here’s an example:

Object object = new Object();
Copy the code

This statement converts into multiple assembly instructions that do roughly three things:

  1. Allocates memory space to instances of the Object class.
  2. Initialize an Object.
  3. The object variable is not null when it points to the newly allocated memory.

Because the Java compiler allows instruction reordering of its optimization, the three steps above May 1 – > 2 – > 3 or 1 – > 3 – > 2, but the step 1 is certainly the first execution, because to do instruction reordering is a premise, is must follow the principle of first occur, to ensure the final execution result is correct, step 2 and step 3 is the premise of step 1, Memory space must be allocated for the instance to initialize objects or point variables to allocated memory. There is no dependency between Step 2 and Step 3, so the execution sequence of Step 2 and Step 3 is uncertain after optimization by the Java compiler.

There is no problem with such optimization in single thread, but there is a problem in multi-thread. Here I give an example. We will use Double Check Locking (DCL for short) to realize singletons, which is the lazy mode, and the code is as follows:

package com.tanjiajun.rxjavademo;

/** * Created by TanJiaJun on 2019-11-14. */
public class Singleton {

    // mInstance is volatile to ensure the order of execution
    private static volatile Singleton mInstance;

    // Private constructor
    private Singleton(a) {
        // Prevent singleton invalidation by calling constructors through reflection
        if(mInstance ! =null) {
            throw new RuntimeException("Cannot construct a singleton more than once."); }}// The method to get a singleton
    public static Singleton getInstance(a) {
        // Check whether mInstance is null for the first time to determine whether synchronization is required to improve performance and efficiency
        if (mInstance == null) {
            synchronized (Singleton.class) {
                // Check whether mInstance is null for the second time to check whether an instance is created
                if (mInstance == null) {
                    mInstance = newSingleton(); }}}/ / return mInstance
        returnmInstance; }}Copy the code

The statement that creates the instance is converted into multiple assembly instructions that do roughly three things:

  1. Allocate memory space to the Singleton class instance.
  2. Initialize the Singleton object.
  3. The mInstance variable is not null if it points to the newly allocated memory.

If we don’t use volatile and lock synchronization, let’s say we have two threads, A and B, and if thread A creates the instance 1->3->2, then when thread B executes step 3, mInstance is no longer null and thread B executes getInstance. Enter the first judgment, since the mInstance variable is not null, another instance will be created, invalidating the singleton.

The CAS operation

CAS (Compare And Swap) prevents dirty reads And writes of shared variables And ensures atomic operations. CAS is also an implementation of optimistic locking. What is optimistic locking? Optimistic locking always assumes the best-case scenario, so conflicts are checked for when data is submitted for update.

AtomicInteger

Take a look at AtomicInteger source code, the code is as follows:

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    / / use sun. Misc. Unsafe
    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    private static final long VALUE;

    static {
        try {
            // VALUE is the memory offset
            VALUE = U.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (ReflectiveOperationException e) {
            throw newError(e); }}// Use volatile to modify value to ensure the order of instruction execution
    private volatile int value;

    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    public AtomicInteger(a) {}public final int get(a) {
        return value;
    }

    public final void set(int newValue) {
        value = newValue;
    }

    public final void lazySet(int newValue) {
        U.putOrderedInt(this, VALUE, newValue);
    }

    public final int getAndSet(int newValue) {
        return U.getAndSetInt(this, VALUE, newValue);
    }

    // Focus on this method, which is the CAS operation, which I will parse below
    public final boolean compareAndSet(int expect, int update) {
        return U.compareAndSwapInt(this, VALUE, expect, update);
    }

    public final boolean weakCompareAndSet(int expect, int update) {
        return U.compareAndSwapInt(this, VALUE, expect, update);
    }

    public final int getAndIncrement(a) {
        return U.getAndAddInt(this, VALUE, 1);
    }

    public final int getAndDecrement(a) {
        return U.getAndAddInt(this, VALUE, -1);
    }

    public final int getAndAdd(int delta) {
        return U.getAndAddInt(this, VALUE, delta);
    }

    public final int incrementAndGet(a) {
        return U.getAndAddInt(this, VALUE, 1) + 1;
    }

    public final int decrementAndGet(a) {
        return U.getAndAddInt(this, VALUE, -1) - 1;
    }

    public final int addAndGet(int delta) {
        return U.getAndAddInt(this, VALUE, delta) + delta;
    }

    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while(! compareAndSet(prev, next));return prev;
    }

    public final int updateAndGet(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while(! compareAndSet(prev, next));return next;
    }

    public final int getAndAccumulate(int x,
                                      IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while(! compareAndSet(prev, next));return prev;
    }

    public final int accumulateAndGet(int x,
                                      IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while(! compareAndSet(prev, next));return next;
    }

    public String toString(a) {
        return Integer.toString(get());
    }

    public int intValue(a) {
        return get();
    }

    public long longValue(a) {
        return (long)get();
    }

    public float floatValue(a) {
        return (float)get();
    }

    public double doubleValue(a) {
        return (double)get(); }}Copy the code

CompareAndSet method is the CAS operation, it is to call sun, misc. Unsafe compareAndSwapInt method, this method is a native method, its effect is every time from memory according to the VALUE of the offset (VALUE) to take out the memory and expect the comparison, If the data is consistent, change the value in memory to UPDATE.

conclusion

Because the MergeObserver corresponding to FlatMap and the SourceObserver corresponding to ConcatMap inherit AtomicInteger, according to the previous source code analysis, their two operators are atomic operations in each set of data, so they are sequential; ObservableConcatMap, ObservableConcatMap, ObservableConcatMap, ObservableConcatMap, ObservableConcatMap, ObservableConcatMap, ObservableConcatMap, ObservableConcatMap, ObservableConcatMap, ObservableConcatMap, ObservableConcatMap, ObservableConcatMap So the data sets emitted by FlatMap are out of order, while the data sets emitted by ConcatMap are in order.

My GitHub: TanJiaJunBeyond

Common Android Framework: Common Android framework

My nuggets: Tan Jiajun

My simple book: Tan Jiajun

My CSDN: Tan Jiajun