ReactiveX is a library for developing asynchronous and event-based programming in observer mode.

It extends the observer pattern to support streaming data and events, and then adds operators for flexible handling of streaming data and events. There are many different types of operators built in, and the declarative writing allows us to focus on business development without having to worry about a lot of the underlying threading, concurrent, non-blocking operations.

concept

What are streaming events?

From the point of view of event dimension, it is the event that happens successively at different time. For example, clicking the screen in sequence can be viewed as a streaming event. Stream of event or event flow

Why use An Observable?

The observer model, which makes it easier to handle the flow of asynchronous events, is more efficient. Write a lot less callback operations

Single items Multi items
Synchronous/synchronous T getData() Iterable getData()
Synchronous/asynchronous Future getData() Observable getData()

As you can see from the figure above, the JDK itself does not provide asynchronous support for multiple pieces of data. For example, if you return something asynchronous, A, B, and C, and you don’t have to do it in order, you’re going to write three futures. If three pieces of data are processed in the same thread, they are synchronized.

Observable provides an ideal way to access asynchronous sequences of multiple items.

Observables can be combined

Java futures are a straightforward way to handle a single asynchronous operation, but if the Future is nested and the asynchronous Future continues within the async, the code becomes complex and difficult to maintain.

If multiple futures are combined for asynchronization, the writing method is difficult to optimize when performing the next operation. Future.get makes it easier to predict the result, but blocks too soon.

The ReactiveX Observables are intended to combine streaming asynchronous data.

Observables are more flexible

The ReactiveX Observables supports not only a single value, but also an infinite data stream, where new data is always present and cannot be processed to the end.

Observables, mentioned above, is an ideal way to handle asynchronous multiple data. The rest of the synchronized set iterators are compared as follows:

Data (event) Iterable (pull) Observable (push)
To get the data T next() onNext(T)
Find the error throws Exception onError(Exception)
To complete the processing ! hasNext() onCompleted()

An Observable is asynchronous push data, as opposed to pull data for synchronous collections.

  • With Iterable, it blocks when no data arrives.
  • An Observable pushes data to consumers whenever it is available, whether it is present or absent, synchronous or asynchronous.

Observables add two new syntax to the classic observer model, making it more powerful and flexible, and keeping it consistent with Iterable, allowing us to write observables as if they were Iterable:

  • Observable calls the Observer when producer data has been consumedonCompletedmethods
  • Observable calls the Observer when the consumer encounters an error or exceptiononErrormethods

Observable vs. Iterable: nothing much changes in the way Iterable is written. Iterable can be seen as a Stream in Java 8.

An Observable’s flexibility also doesn’t care how its data sources are implemented. Whether it’s using thread pools, actors, Event loops, or anything else, an Observable sees it as asynchronous.

With ReactiveX, you can modify the underlying Observable implementation without changing the Observable Consumer code.

use

ReactiveX is more of an idea, and common programming languages have implementations of ReactiveX. Take RxJava, the implementation of Java, for example.

Write a simple Hello World example:

1 Introducing Maven.

< the dependency > < groupId > IO. Reactivex. Rxjava2 < / groupId > < artifactId > rxjava < / artifactId > < version > 2.2.12 < / version > </dependency>Copy the code

2. Write code related to Rxjava

public class RxHello { public static void main(String[] args) { // 1. Create Observable Flowable<String> Observable = Flowable."hello"."world"// 2. Use operator.map (String::toUpperCase); Create an Observer Consumer<String> Consumer = new Consumer<String>() {@override public void Accept (String s) throws Exception { System.out.println("Received data:"+ s); }}; Observable. Subscribe (consumer); // 4. }}Copy the code

The code above looks simple, and it is simple when written, with three main steps: four if you add the operators

ReactiveX provides a number of different operators to create an Observable, which should be implemented in the corresponding programming language.

// The following operators can Create 'Create', 'Defer', 'Empty'/' Never '/' Throw ', 'From', 'Interval', 'Just', 'Range', 'Repeat', 'Start', And 'Timer' // for example, observable.create (); Observable.defer(); Observable.just(); Observable.fromArray()Copy the code

2 carries out data processing of Observable, and has a lot of built-in operators. Conversion, filtering, statistics, error handling and so on

Common Map, FlatMap, Filter, Take, Skip, etc

Create an Observer, or consumer, to receive Observable data

Common classes: FlowableSubscriber, Subscriber, Comsumer, mainly our code logic.

4 establish the relationship between Observable and Observer.

The last

This section only introduces the concept of ReactiveX, introduces the key asynchronous streaming data, and the observer mode section, provides case code, understand the basic writing of ReactiveX.

To feed to the power of ReactiveX, you also need to master its rich Operator Operator capabilities.

Reference:

reactivex.io/intro.html

Reactivex. IO/documentati…

medium.com/@andrestalt…