This article describes RxJava connection operations. So before we get to that, let’s generalize the types of Observables. What? Observables have different kinds, right? Never used another kind of Observable?

The types of observables

  • Cool Observables are Cool Observables we usually use. What are their features?

    1. A Cool Observable must wait for an Observer to subscribe before sending data.
    2. Cool Observable is an independent channel for each observer, even if their data is the same. Even if the Observable sends a bunch of events, subscribe to an Observer, and all data is played back to the new Observer. (Similar to sticky radio in Android)
  • What’s the difference between a Hot Observable and a Cool Observable?

    1. Hot Observable, sending data requires a Connect operator to start sending events, so it’s more flexible and can manually control when to start sending. If it is not called, it will never be sent (no matter how many Observer subscriptions there are). In addition, a Hot Observable can send data even if there are no observers, while a Cool Observable waits for Observer subscriptions before sending.
    2. Hot Observables “share” events sent by each Observer, so if a Hot Observable sends many events and subscribes to an Observer, the Observer will not receive the events sent before. (Similar to normal broadcast on Android)
  • A Hot Observable is also called a connected Observable. Cool Observable is called a normal Observable

Publish and connect operators

Publish and Connect are commonly used together, so they are introduced together

Publish: Converts a normal Observable to a linkable Observable. Connect: Enables a connected Observable to start sending data to subscribers. An Observable calls Connect and returns a Disposable, which terminates the Observable. (No call, no termination, even if all observers are unregistered, it will not terminate unless manually called)

ConnectableObservable<Long> Observable = Observable.interval(1, timeunit.seconds) // Convert to observable.publish (); observable.subscribe(new Consumer<Long>() { @Override public void accept(Long value) throws Exception { System.out.println("value -> "+ value); }}); // Start sending Disposable/Disposable = observable. Connect (); Observable // Register delay 2 seconds. DelaySubscription (2, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long value) throws Exception { System.out.println(system.out.println (system.out.println ("value -> "+ value); }}); new Timer().schedule(newTimerTask() {
        @Override
        public void runDispose () {dispose(); }}, 6000);Copy the code

RefCount operator

Make a connected Observable behave like a normal Observable

In plain English: Sometimes we want to go back to Cool Observable when we have an Observer subscription and emit data, but we want to have the behavior of a Hot Observable by using the RefCount operator. RefCount is the reverse of the Publish operator. After the conversion, this Observable automatically starts sending data when it is subscribed. However, unlike a Cool Observable, it is still like a Hot Observable. If you subscribe to a later Observer, you will still not receive the previous data! In addition, the termination operation is terminated when all observers have unsubscribed. (Hot Observable calls Dispose () manually by calling the Disposable returned by Connect, which means sending and terminating are automated.)

Observable<Long> Observable = observable. interval(1, timeunit.seconds) // Convert to linkable observable. publish() //"Reverse"Convert to normal Observable.refCount (); observable.subscribe(new Consumer<Long>() { @Override public void accept(Long value) throws Exception { System.out.println("value -> "+ value); }}); Observable // Register delay 2 seconds. DelaySubscription (2, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long value) throws Exception { System.out.println(system.out.println (system.out.println ("value -> "+ value); }});Copy the code

Share the operator

Because publish and refCount are so common, RxJava provides the share operators, which are called internally. Therefore, share is often used.

public final Observable<T> share() {
    return publish().refCount();
}
Copy the code

Hot Observable usage scenarios

Terminates an Observable when the sent event is “shared” and requires all observers to unsubscribe.

For example, WebSocket chat, like wechat, QQ chat, there are text messages, picture messages, voice messages, system messages and so on.

Each message type can be handled differently than as a subscriber (a subscriber only processes the kind of message he is bound to), and the timing of each subscriber’s subscription may be inconsistent and may be triggered only under specified conditions. If it’s a Cool Observable, for example, a text message is subscribed to. The datasource Observable has already sent a lot of messages, and subscribes to system message observation at a specific time. Previously all text messages would have been batched back to the system message observer, who did not have to deal with this type of event, wasting these events and unnecessary resources.

For example, there may be only one chat interface for subscribers to subscribe to, but messages are not only received by one interface, and the outer message list can also be displayed for receiving messages. (Have you ever seen wechat quit the chat interface and received no messages?) . Even if the App is killed and all activities are destroyed, the Service is still running and messages are being listened to (wechat is killed and messages are not received?).

Last and message to tackle the problem of layered message received, no need to look at the chat logs, so sure to insert the message record to the database, then insert the processing operations, certainly not, and the interface message to observe together (cancel the observation at the exit interface, database insert records will not be able to perform the operation, it is certainly not). So it must be a single message observer that inserts records into the database, and in the case of a Cool Observable, the old record is replayed and the record gets messed up.

Observable. Create (new WebSocketOnSubscribe(URL)).retry() // Because of the share operator, doOnDispose(new) is called only when all observers unregisterAction() {@override public void run() throws Exception {// Close the connection closeNow(url); Logger.d(TAG,"All observers are unregistered, connections are closed..."); }}) //Share operator to implement multiple observers corresponding to a single data source.share() // place all callbacks to the main thread callback, external callers directly observe, Implement response callback methods do UI processing. SubscribeOn (Schedulers. IO ()). ObserveOn (AndroidSchedulers. MainThread ());Copy the code

Replay is operator

When it comes to events emitted by the Hot Observable, if the subscriber subscribes to the Hot Observable after it has sent multiple data, the previous data will not be received. Is there any way to receive it? The replay operator comes in handy.

  • Replay (), with no arguments, receives all events before it subscribes, which is the same as Cool Observable.

  • Replay (int bufferSize), the number of cached events. For example, if set to 1, the postsubscribed observer receives an event before it subscribed.

Observable<Long> Observable = observable. interval(1, timeunit.seconds) // Convert to linkable observable. publish() //"Reverse"Convert to normal Observable.refCount (); observable.subscribe(new Consumer<Long>() { @Override public void accept(Long value) throws Exception { System.out.println("value -> "+ value); }}); Observable // Before caching subscribe, Replay (1) // Delay registration for 2 seconds. DelaySubscription (2, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long value) throws Exception { System.out.println(system.out.println (system.out.println ("value -> "+ value); }});Copy the code
  • Replay (long time, TimeUnit unit), caches events within a specified time, such as events 3 seconds ago, so that when the observer subscribes, it will receive events sent 3 seconds before the subscription.
Observable<Long> Observable = observable. interval(1, timeunit.seconds) // Convert to linkable observable. publish() //"Reverse"Convert to normal Observable.refCount (); observable.subscribe(new Consumer<Long>() { @Override public void accept(Long value) throws Exception { System.out.println("value -> "+ value); }}); Observable // Before caching subscribe, Events within 3 SECONDS. Replay (3, timeunit.seconds) // Delay registration for 2 SECONDS. DelaySubscription (2, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long value) throws Exception { System.out.println(system.out.println (system.out.println ("value -> "+ value); }});Copy the code