preface

  • RxjavaBecause of itsChain call based on event flow, simple logic & easy to useThe characteristics of the deepAndroidDeveloper welcome.

If you are not familiar with RxJava, check out Android: a clear and easy-to-understand introduction to RxJava

  • This article mainly explains isRxJavaIn theBack pressure control strategyI hope you like it.
  1. This series of articles is based onRxjava 2.0
  2. In the coming days, I will continue to publish a series of articles on Rxjava 2.0 in Android, including principles, operators, application scenarios, back pressure, etc. If you are interested, please continue to follow Carson_Ho’s Android development notes!!

All code demos in this article are stored at Carson_Ho’s Github address


directory


1. The introduction

1.1 background

  • There are two subscription relationships between observer and observed: synchronous and asynchronous. Details are as follows:

  • For asynchronous subscriptions, there is a mismatch between the speed at which the observed sends events and the speed at which the observer receives them
  1. Send & Receive event rate = number of send & receive events per unit of time
  2. In most cases, the rate at which the observer sends events is greater than the rate at which the observer receives events

1.2 the problem

  • The speed at which the observed sends events is too fast, and the observer cannot receive all the events, so the observer cannot respond/process all the sent events in time, resulting in overflow of cache and event loss & OOM
  1. For example, click button event: click button 10 times continuously too fast, will only cause the effect of clicking 2 times;
  2. Explanation: Because the speed of the click is too fast, so the button does not respond

Here’s another example:

  • The sending event speed of the observed = 10ms/piece
  • The receiving event rate of the observer = 5s/unit

That is, there is a serious mismatch between send and receive events

Observable.create(new ObservableOnSubscribe<Integer>() { // 1. @override Public void subscribe(ObservableEmitter<Integer> Emitter) throws Exception {for (int i = 0; ; i++) {
                    Log.d(TAG, "Event sent"+ i ); Thread.sleep(10); // Emitters are emitter. OnNext (I); }}}). SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / set the observer in the main thread .subscribe(new Observer<Integer>() { // 2. Override public void onSubscribe(Disposable d) {Log."Start using subscribe connections."); } @override public void onNext(Integer value) {try {thread.sleep (5000); Log.d(TAG,"Event received"+ value  );
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "Respond to an Error event");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Respond to the Complete Event"); }});Copy the code
  • Results As the speed of the event sent by the observer is greater than the speed of the event received by the observer, the velocity mismatch occurs, resulting inOOM

1.3 Solution

Adopt back pressure strategy.

Next, I will introduce the back pressure strategy.


2. Introduction to back pressure strategy

2.1 define

A strategy for controlling the rate of event flow

2.2 role

In an asynchronous subscription, control the speed at which events are sent and received

Note: The scope of back pressure = asynchronous subscription, i.e. observed & observer in different threads

2.3 Problems solved

It solves the problem that the observer cannot respond/process all the events sent by the observer in time because the speed of the event sent by the observer does not match the speed of the event received by the observer (generally the former is faster than the latter)

2.4 Application Scenarios

  • A scenario where the rate at which the observer sends events does not match the rate at which the observer receives events
  • The specific scenario depends on the event type, for example, network request. In the specific scenario, there are many network requests to be executed, but the execution speed is not that fast. In this case, the back pressure policy is required to control the network request.

3. Principle of back pressure strategy

  • So, RxJava implements the back pressure strategy (BackpressureHow does that work?
  • The solutions & ideas are as follows:

  • The schematic diagram is as follows

  • withRxJava1.0The old implementation of the observed inObservablecontrast

  • Ok, so in the figure aboveRxJava 2.0In the observer model,FlowableWhat is it? It is actuallyRxJava 2.0It is also the carrier of the implementation of back pressure strategy
  • Please continue to see the introduction of the next section: concrete implementation of the back pressure strategy –Flowable

4. Concrete implementation of back pressure strategy: Flowable

In RxJava2.0, Flowable is used to implement the back pressure strategy

Correctly, it should be a “non-blocking back pressure” strategy

4.1 introduce Flowable

  • Definition:RxJava2.0, the observed (ObservableA new implementation of

Meanwhile, the old Observable implementation of RxJava1.0: Observable remains

  • Purpose: To achieve non-blocking back pressure strategy

4.2 characteristics of Flowable

  • FlowableThe characteristics of are as follows

  • I’m going to post another oneRxJava2.0RxJava1.0Comparison of the observer model

In fact, RxJava2.0 also has a reserved observerble-observer Observer model, just for the sake of comparison

4.3 relationship to the observed old implementation Observable in RxJava1.0

  • The details are shown below.

  • So why adopt the new implementationFlowableAchieve back pressure without using the oldObservable?
  • Main reason: old implementationObservableCan’t solve the back pressure problem very well.

4.4 Basic use of Flowable

  • FlowableThe basic use is very similarObservable
  • Specific as follows
/** * Step 1: Upstream = Flowable. Create (new FlowableOnSubscribe<Integer>() {@override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }, BackpressureStrategy.ERROR); // We need to pass in the BackpressureStrategy parameter, as described below. Creating an observer = Subscriber */ Subscriber<Integer> Downstream = new Subscriber<Integer>() {@override public void OnSubscribe (Subscription s) {// Subscribe(Disposable, Subscriber) = Subscription // Subscription is used as a Disposable argument, Disposable.dispose() disconnects the connection, same as Subscription.cancel() disconnects the connection: Subscription adds void Request (long n) log.d (TAG,"onSubscribe"); s.request(Long.MAX_VALUE); More on request() below} @override public void onNext(Integerinteger) {
                Log.d(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete"); }}; /** * Step 3: create a subscription */ upstream. Subscribe (downstream);Copy the code

  • More elegant chain calls
// Step 1: Create (new FlowableOnSubscribe<Integer>() {@override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { Log.d(TAG,"Send Event 1");
                emitter.onNext(1);
                Log.d(TAG, "Send Event 2");
                emitter.onNext(2);
                Log.d(TAG, "Send Event 3");
                emitter.onNext(3);
                Log.d(TAG, "Send completed"); emitter.onComplete(); }}, BackpressureStrategy. ERROR). The subscribe (new Subscriber < Integer > () {/ / step 2: Override public void onSubscribe(Subscription s) {log.d (TAG, TAG)"onSubscribe");
                        s.request(3);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code
  • At this point,FlowableThe basic use of the explanation
  • Further uses are explained in conjunction with the implementation of the back pressure strategy

5. Use of back pressure strategy

  • In this section, I will combine the principle of the back pressure policy & the use of Flowable, to introduce how to use Flowable to implement the back pressure policy function in RxJava 2.0
  • FlowablewithObservableThe difference in function is mainlyMore function of back pressure
  • Next, I will follow section 3 to explain the implementation principle & solution of back pressure strategy (as shown in the figure below)FlowableThe use of the back pressure strategy function

Note:

  1. Since we mentioned in Section 2 that scenarios using back pressure = asynchronous subscriptions, we will focus on asynchronous subscription scenarios, where the observed and the observer are working on different threads
  2. However, since flow rate mismatches can also occur in synchronous subscription scenarios, I’ll talk a little bit about synchronization after the asynchronous case for comparison purposes

5.1 Control the rate at which the observer receives events

5.1.1 Asynchronous Subscription
  • Introduction to the

  • Specific schematic diagram

  • The specific use
Create (new FlowableOnSubscribe<Integer>() {@override public void Subscribe (FlowableEmitter<Integer> Emitter) throws Exception {subscribe(Emitter<Integer> emitter)"Send Event 1");
                emitter.onNext(1);
                Log.d(TAG, "Send Event 2");
                emitter.onNext(2);
                Log.d(TAG, "Send Event 3");
                emitter.onNext(3);
                Log.d(TAG, "Send Event 4");
                emitter.onNext(4);
                Log.d(TAG, "Send completed"); emitter.onComplete(); }}, BackpressureStrategy. ERROR). SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / Subscribe(new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) {// Subscriber = Subscription // The Subscription argument has the effect of the Disposable argument, Disposable.dispose() disconnects the connection, and the Subscription.cancel() disconnects the connection. Subscription adds void Request (long n) s.count (3); / / function: MAX_VALUE (long.max_value); // The official default is to use long. MAX_VALUE (long.max_value); // The default is to use s.count (long.max_value). } @Override public void onNext(Integerinteger) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code
  • rendering

  • There are two conclusions that you should be aware of

Figure below = Schematic of overflow errors reported when cache is full (128 events)

  • Code demo 1: The observer does not receive events, the observer continues to send events & store in the cache; Remove as needed
/** * Step 1: Set variable */ private static Final String TAG ="Rxjava"; private Button btn; // This button is used to call Subscription. Request (long n) private Subscription mSubscription; Request (long n) */ BTN = (Button) findViewById(R.id.btn); btn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { mSubscription.request(2); }}); /** * Step 3: Create (new FlowableOnSubscribe<Integer>() {@override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { Log.d(TAG,"Send Event 1");
                emitter.onNext(1);
                Log.d(TAG, "Send Event 2");
                emitter.onNext(2);
                Log.d(TAG, "Send Event 3");
                emitter.onNext(3);
                Log.d(TAG, "Send Event 4");
                emitter.onNext(4);
                Log.d(TAG, "Send completed"); emitter.onComplete(); }}, BackpressureStrategy. ERROR). SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / Subscribe(new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) {log.d (TAG,"onSubscribe"); mSubscription = s; // Save the Subscription object and wait for the observer to receive the event when the button is clicked (call Request (2))} @override public void onNext(Integer)integer) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code

  • Code Demo 2: The observed continues to send events beyond the cache size without the observer receiving the event (128)
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) Throws Exception {// Send a total of 129 events, that is, the size of the cache is exceededfor(int i = 0; i< 129; i++) { Log.d(TAG,"Event sent"+ i); emitter.onNext(i); } emitter.onComplete(); }}, BackpressureStrategy. ERROR). SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / Subscribe(new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) {log.d (TAG,"onSubscribe"); } @override public void onNext(Integerinteger) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code

5.1.2 Synchronizing Subscription Information

The difference between synchronous and asynchronous subscriptions is that:

  • In synchronous subscriptions, the observed and the observer work on the same thread
  • There is no cache in a synchronous subscription

  • After sending an event, the observed must wait for the observer to receive it before sending another event
/** * Step 1: Upstream = Flowable. Create (new FlowableOnSubscribe<Integer>() {@override public Void subscribe(FlowableEmitter<Integer> Emitter) throws Exception {// Send 3 events log. d(TAG,"Event 1 was sent");
                emitter.onNext(1);
                Log.d(TAG, "Event 2 was sent");
                emitter.onNext(2);
                Log.d(TAG, "Event 3 was sent"); emitter.onNext(3); emitter.onComplete(); } }, BackpressureStrategy.ERROR); /** * Step 2: Creating an observer = Subscriber */ Subscriber<Integer> Downstream = new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe"); s.request(3); } @override public void onNext(Integer)integer) {
                Log.d(TAG, "Event received" + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete"); }}; /** * Step 3: create a subscription */ upstream. Subscribe (downstream);Copy the code

Therefore, it is not true that the speed at which the observer sends events is greater than the speed at which the observer receives them. However, there is a problem of the number of events sent by the observer > the number of events received by the observer.

  • For example, the observer can only accept 3 events, but the observed sent 4 events, so there is a mismatch
/** * Step 1: Upstream = Flowable. Create (new FlowableOnSubscribe<Integer>() {@override public Void subscribe(FlowableEmitter<Integer> emitter) throws Exception {// The number of events sent by the observer = 4 log. d(TAG,"Event 1 was sent");
                emitter.onNext(1);
                Log.d(TAG, "Event 2 was sent");
                emitter.onNext(2);
                Log.d(TAG, "Event 3 was sent");
                emitter.onNext(3);
                Log.d(TAG, "Event 4 was sent"); emitter.onNext(4); emitter.onComplete(); } }, BackpressureStrategy.ERROR); /** * Step 2: Creating an observer = Subscriber */ Subscriber<Integer> Downstream = new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe"); s.request(3); } @override public void onNext(Integerinteger) {
                Log.d(TAG, "Event received" + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete"); }}; /** * Step 3: create a subscription */ upstream. Subscribe (downstream);Copy the code

Therefore, for a synchronous subscription without the concept of cache, simply controlling the number of events received by the observer (reactive pull) is actually “unrequited love”. Although the observer controls the number of events received, there is still a problem if the observed needs to send four events.

This will be addressed in 5.2 controlling the rate at which the observed sends events.

  • There is a special case to note

  • Code demo
/** * Step 1: Upstream = Flowable. Create (new FlowableOnSubscribe<Integer>() {@override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { Log.d(TAG,"Event 1 was sent");
                emitter.onNext(1);
                Log.d(TAG, "Event 2 was sent");
                emitter.onNext(2);
                Log.d(TAG, "Event 3 was sent"); emitter.onNext(3); emitter.onComplete(); } }, BackpressureStrategy.ERROR); /** * Step 2: Creating an observer = Subscriber */ Subscriber<Integer> Downstream = new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe"); Request (long n) // s.equest (long.max_value); } @Override public void onNext(Integerinteger) {
                Log.d(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete"); }}; /** * Step 3: create a subscription */ upstream. Subscribe (downstream);Copy the code

After the first event that is sent by the observer, an abnormal MissingBackpressureException & observer didn’t receive any events


5.2 Control the speed at which the observed sends the event

  • Introduction to the

  • FlowableEmitterOf the classrequested()introduce
Public Emitter<T> extends Emitter<T> public Emitter<T> onNext(),onComplete() & onError long requested(); // Request (a) returns the value of a in request (a) in the current thread // post only key code}Copy the code
  • The return value of requested () in each thread = the a value of Request (a) in that thread

  • Schematics corresponding to synchronous & asynchronous subscription situations

To help you understand the use of Requested () in this policy, this section explains synchronous subscriptions first and then asynchronous subscriptions


5.2.1 Synchronizing Subscription Information

  • Principle that

In synchronous subscribe to the case, the observed by FlowableEmitter. Requested () to obtain the observer’s own ability to receive events, thus according to the information control event sending speed, so as to achieve the observer inversion of control is the effect of the observer

  • Using the following example = the observed sends only 10 events based on the observer’s ability to receive events (10 events)
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) Throws Exception {// Call Emitters. Requested () to get the number of events the current observer needs to receive. Long n = Emitters. Log.d(TAG,"Observer receptive events"+ n); // Send events based on the value emitters. Requested (), the number of events the current observer needs to receivefor (int i = 0; i < n; i++) {
                    Log.d(TAG, "Event sent"+ i); emitter.onNext(i); }}}, BackpressureStrategy.ERROR) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe"); // Set the observer to accept 10 events at a time. } @Override public void onNext(Integerinteger) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code

  • Pay special attention to use in synchronous subscription situationsFlowableEmitter.requested()The following features need to be paid attention to:

Case 1: Superposition

  • That is, observers can continuously request to receive events, which are superimposed and sent together
Subscription. Request (A1); Subscription. Request (A2); FlowableEmitter. Requested return value = a1 + a2 ()Copy the code
  • Code demo
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) Throws Exception {// Call Emitters. Requested () to get the number of events that the current observer needs to receive log.d (TAG,"Observer receptive events"+ emitter.requested()); }}, BackpressureStrategy.ERROR) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe"); s.request(10); // Set the observer to accept 10 events at a time. } @override public void onNext(Integer)integer) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code

Case 2: Real-time updates

  • That is, after each event is sent, Emitters. Requested () will update events that are acceptable to the observer in real time
  1. That is, the observer will receive 10 events at first, and after sending one, it will be updated to 9 in real time
  2. Only calculateNextThe event,complete & errorEvents don’t count.
Subscription. Request (10); / / FlowableEmitter. Requested () returns a value = 10. FlowableEmitter onNext (1); / / sent one event / / FlowableEmitter requested () returns a value = 9Copy the code
  • Code demo
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { // 1. Call Emitters. Requested () to get the number of events the current observer needs to receive log.d (TAG,"Number of events received by observer ="+ emitter.requested()); Emitter. Requested () will be updated to 9 log.d (TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG."Event 1 was sent");
                emitter.onNext(1);
                Log.d(TAG, "Number of events to send after sending event 1 =" + emitter.requested());

                Log.d(TAG, "Event 2 was sent");
                emitter.onNext(2);
                Log.d(TAG, "Number of events to send after event 2 =" + emitter.requested());

                Log.d(TAG, "Event 3 was sent");
                emitter.onNext(3);
                Log.d(TAG, "Number of events to send after event 3 ="+ emitter.requested()); emitter.onComplete(); }}, BackpressureStrategy.ERROR) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe"); s.request(10); Override public void onNext(Integer) {Override public void onNext(Integer)integer) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code

Case 3: Abnormal

  • whenFlowableEmitter.requested()When it drops to 0, the observer is no longer able to receive events
  • If the observed continues to send the event, it will be thrownMissingBackpressureExceptionabnormal

If the number of events received by the observer = 1, an exception will be thrown when the observed sends the second event

Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { // 1. Call Emitters. Requested () to get the number of events the current observer needs to receive log.d (TAG,"Number of events received by observer ="+ emitter.requested()); Emitter. Requested () will be updated to 9 log.d (TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG."Event 1 was sent");
                emitter.onNext(1);
                Log.d(TAG, "Number of events to send after sending event 1 =" + emitter.requested());

                Log.d(TAG, "Event 2 was sent");
                emitter.onNext(2);
                Log.d(TAG, "Number of events to send after event 2 ="+ emitter.requested()); emitter.onComplete(); }}, BackpressureStrategy.ERROR) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe"); s.request(1); Override public void onNext(Integer) {Override public void onNext(Integer)integer) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code

additional

  • If the observer does not set the number of events that can be received, there is no callThe Subscription request ()
  • Then the number of events received by the observed default observer = 0, i.eFlowableEmitter.requested()Return value = 0

5.2.2 Asynchronous Subscription

  • Principle that

Can be seen from the above, as the two are in different threads, so cannot be observed by FlowableEmitter. Requested () know the viewer receive events ability, is not according to the viewer receives events observed the ability to control the speed of sending events. See the following example for details

Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) Throws Exception {// Call Emitters. Requested () to get the number of events that the current observer needs to receive log.d (TAG,"Number of events received by observer ="+ emitter.requested()); }}, BackpressureStrategy. ERROR). SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / Subscribe(new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) {log.d (TAG,"onSubscribe"); s.request(150); // This setting only affects requested in the observer thread, But will not affect the observed in FlowableEmitter. Requested () returns a value / / because FlowableEmitter requested () returns a value depends on RxJava internal call request (n), Request (128);} @override public void onNext(Integerinteger) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code

In an asynchronous subscription relationship, the principle of reverse control is: throughRxJavaInternal fixed calls are made to the observer threadrequest(n)Thus reverse control of the observed sending event speed

So when should I call request(n) &n in the observed thread? Keep reading.

  • The specific use

RxJava internal call request(n) (n = 128, 96, 0)

Request (128), Request (96), and Request (0) are called

  • Code demo

I’ll use an example to illustrate the logic of this principle

/ / be observer: need to send in total 500 events, but the premise that really start sending events = FlowableEmitter. Requested () return value indicates a 0 / / observer: Flowable. Create (new FlowableOnSubscribe<Integer>() {@override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { Log.d(TAG,"Number of events received by observer ="+ emitter.requested()); boolean flag; // Set the flag bit control // The observed needs to send 500 events altogetherfor (int i = 0; i < 500; i++) {
                        flag = false; // Requested () == 0 will not be sentwhile (emitter.requested() == 0) {
                            if(! flag) { Log.d(TAG,"No more");
                                flag = true; } // Requested () ≠ 0 to send log.d (TAG,"Event sent" + i + ", the number of events the observer can receive ="+ emitter.requested()); emitter.onNext(i); }}}, BackpressureStrategy. ERROR). SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / Subscribe(new Subscriber<Integer>() {@override public void onSubscribe(Subscription s) {log.d (TAG,"onSubscribe"); mSubscription = s; // Initial state = do not receive events; Receive events by clicking the button} @override public void onNext(Integerinteger) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }}); BTN = (Button) findViewById(R.id.btn); btn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { mSubscription.request(48); // Click the button to receive 48 events}});Copy the code

The whole process & test results are shown below


5.3 BackpressureStrategy: BackpressureStrategy

5.3.1 Back pressure Mode

In the use of Flowable, you are asked to pass in the back pressure mode parameter

  • Object oriented: For cache
  • Effect: When the cache size is full and the observed continues to send the next event, how to handle the policy

Cache size full, overflow = send event speed > receive event speed result = send & receive event mismatch result

5.3.2 Back pressure Mode Type

I’ll explain each of these patterns one by one.

Pattern 1: BackpressureStrategy. ERROR

  • Problem: Send event speed > receive event speed, that is, the flow rate does not match

Specific performance: occurs when the cache size is full (default cache size = 128) and the observed continues to send the next event

  • Solution: Throw an exception directlyMissingBackpressureException
Create (new FlowableOnSubscribe<Integer>() {@override public void Subscribe (FlowableEmitter<Integer> Emitter) throws Exception {// Sending 129 eventsfor(int i = 0; i< 129; i++) { Log.d(TAG,"Event sent"+ i); emitter.onNext(i); } emitter.onComplete(); }}, BackpressureStrategy. ERROR). / / set the back pressure mode = BackpressureStrategy ERROR. SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread ObserveOn (AndroidSchedulers. MainThread ()) / / set the observer in the main thread. The subscribe (new Subscriber < Integer > () {@ Override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code

Pattern 2: BackpressureStrategy. MISSING

  • Problem: Send event speed > receive event speed, that is, the flow rate does not match

Specific performance is: when the cache size is full (default cache size = 128), the observed still continue to send the next event

  • Friendly tip: The cache is full
Create (new FlowableOnSubscribe<Integer>() {@override public void Subscribe (FlowableEmitter<Integer> Emitter) throws Exception {// Sending 129 eventsfor(int i = 0; i< 129; i++) { Log.d(TAG,"Event sent"+ i); emitter.onNext(i); } emitter.onComplete(); }}, BackpressureStrategy. MISSING). / / set the back pressure mode = BackpressureStrategy MISSING. SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread ObserveOn (AndroidSchedulers. MainThread ()) / / set the observer in the main thread. The subscribe (new Subscriber < Integer > () {@ Override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code

Pattern 3: BackpressureStrategy. BUFFER

  • Problem: Send event speed > receive event speed, that is, the flow rate does not match

Specific performance is: when the cache size is full (default cache size = 128), the observed still continue to send the next event

  • Solution: Set the cache size to infinity
  1. That is, the observed can send event observers indefinitely, but it is actually stored in the cache
  2. However, pay attention to the memory condition to prevent OOM
Create (new FlowableOnSubscribe<Integer>() {@override public void Subscribe (FlowableEmitter<Integer> Emitter) throws Exception {// Sending 129 eventsfor(int i = 1; i< 130; i++) { Log.d(TAG,"Event sent"+ i); emitter.onNext(i); } emitter.onComplete(); }}, BackpressureStrategy. BUFFER). / / set the back pressure mode = BackpressureStrategy BUFFER. SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread ObserveOn (AndroidSchedulers. MainThread ()) / / set the observer in the main thread. The subscribe (new Subscriber < Integer > () {@ Override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code

The number of events that exceed the original cache size (128) can now be received

Pattern 4: BackpressureStrategy. DROP

  • Problem: Send event speed > receive event speed, that is, the flow rate does not match

Specific performance is: when the cache size is full (default cache size = 128), the observed still continue to send the next event

  • Handling method: Discard events that exceed the cache size (128)

If 150 events are sent, only events 1-128 are saved, and events 129-150 are discarded

Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) Throws Exception {// Sends 150 eventsfor(int i = 0; i< 150; i++) { Log.d(TAG,"Event sent"+ i); emitter.onNext(i); } emitter.onComplete(); }}, BackpressureStrategy. DROP) / / set the back pressure mode = BackpressureStrategy. DROP the subscribeOn (Schedulers. IO ()) / / set the observed in IO thread ObserveOn (AndroidSchedulers. MainThread ()) / / set the observer in the main thread. The subscribe (new Subscriber < Integer > () {@ Override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe"); mSubscription = s; } @override public void onNext(Integerinteger) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }}); btn = (Button) findViewById(R.id.btn); btn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { mSubscription.request(128); // receive 128 events at a time}});Copy the code

The observed sent 150 events at once, and 128 when the button was clicked to receive; The event cannot be accepted when you click accept again, indicating that events that exceed the size of the cache are discarded.

Pattern 5: BackpressureStrategy. LATEST

  • Problem: Send event speed > receive event speed, that is, the flow rate does not match

Specific performance is: when the cache size is full (default cache size = 128), the observed still continue to send the next event

  • Handling method: Only the latest (last) events are saved, and events exceeding the buffer size (128) are discarded

That is, if 150 events are sent, 129 events (1-128th + 150th) will be stored in the cache.


        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for(int i = 0; i< 150; i++) { Log.d(TAG,"Event sent"+ i); emitter.onNext(i); } emitter.onComplete(); }}, BackpressureStrategy. LATEST) / / / / set back pressure mode = BackpressureStrategy. LATEST. SubscribeOn (Schedulers. IO ()) / / set the observed in IO thread ObserveOn (AndroidSchedulers. MainThread ()) / / set the observer in the main thread. The subscribe (new Subscriber < Integer > () {@ Override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe"); mSubscription = s; } @override public void onNext(Integerinteger) {
                        Log.d(TAG, "Event received" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }}); btn = (Button) findViewById(R.id.btn); btn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { mSubscription.request(128); // receive 128 events at a time}});Copy the code
  • The observed sent 150 events at once, and 128 when the button was clicked to receive;
  • When I click accept again, I receive 1 event (150th event), indicating that only the last event (150th event) is retained for events that exceed the cache size


5.3.3 Special attention

When using the back pressure strategy mode, there are 1 situations that need to be noted:

A. Background FLowable can be created by itself (as in the example above) or automatically by other means, such as the interval operator

An introduction to the interval operator

  1. Function: every 1 period of time to produce a number (Long type), starting from 0, 1 increment, until infinity
  2. The default runs on a new thread
  3. Different from the timer operator: The timer operator terminates sending

B. conflict

  • For manually creating FLowable yourself, you can select the back pressure policy (described above) by passing in the back pressure mode parameter

  • However, it is not possible to manually pass in the back pressure mode parameters to automatically create FLowable. How to select the back pressure mode in the case of flow mismatch?

// Interval automatically creates the observed Flowable // every 1ms increments the current number (starting from 0) by 1, Flowable. Interval (1, Timeunit.milliseconds).observeon (schedulers.newthread ()) // The observer also works on a new thread. Subscribe (new Subscriber<Long>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe"); mSubscription = s; s.request(Long.MAX_VALUE); } @override public void onNext(Long aLong) {log.d (TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG, TAG)"onNext: "+ aLong); try { Thread.sleep(1000); / / every time delay of 1 second to receive events / / for sending events = delay 1 ms, receive events = delay 1 s, the speed of sending & receiving speed mismatch problem / / cache area will soon be filled with 128 events, thus thrown MissingBackpressureException abnormalities, Catch (InterruptedException e) {e.printStackTrace(); } } @Override public void onError(Throwable t) { Log.w(TAG,"onError: ", t);
                    }
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code

C. Solution RxJava 2.0 internally provides a way to encapsulate the back pressure policy pattern

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

The default using BackpressureStrategy. The ERROR model

Specific use is as follows:

Flowable.interval(1, timeunit.milliseconds).onBackpressureBuffer() ObserveOn (schedulers.newthread ()). Subscribe (new Subscriber<Long>() {@override public void onSubscribe(Subscription s) { Log.d(TAG,"onSubscribe");
                        mSubscription = s;
                        s.request(Long.MAX_VALUE); 
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "onNext: " + aLong);
                        try {
                            Thread.sleep(1000);
                            
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete"); }});Copy the code

Thus, the speed mismatch between sending events and receiving events is solved.

The function of the other methods is similar to that of the back pressure mode parameters, which are not described here.

Summary of back pressure strategy mode

  • At this point, theRxJava 2.0The back pressure mode is finally explained
  • All code demos are stored at Carson_Ho’s Github address

6. Summary

  • This article mainly on Rxjava back pressure mode knowledge to explain

  • In the coming days, I will continue to publish a series of articles on Rxjava 2.0 in Android, including principles, operators, application scenarios, back pressure, etc. If you are interested, please continue to follow Carson_Ho’s Android development notes!!


Thumb up, please! Because your encouragement is the biggest power that I write!


Welcome to attentionCarson_HoRare earth nuggets!

Share the dry things about Android development from time to time, the pursuit of short, flat, fast, but there is no lack of depth.