Responsive programming is widely used in front end development and Android development, but its non-blocking asynchronous programming model and message flow processing mode are also increasingly used in back end. In addition to the extensive use of responsive programming in Netflix’s OSS, Ali recently suggested that Dubbo 3.0 will embrace responsive programming in its entirety.

I have also provided a responsive programming scheme for some project requirements, which better solves the problems of parallel programming and asynchronous programming. But with a deeper understanding of reactive programming, I’ve come up with some of my own practices.

Responsive programming is not a silver bullet

Responsive programming is not a silver bullet. Indeed, in software, Brooks’s “no silver bullet” may be forever true. When we choose to use reactive programming, it is important to know its application scenarios, including:

  • Handles events initiated by users or other systems, such as mouse clicks, keyboard keystrokes, or iot devices that emit signals all the time
  • Process high latency I/O data, such as disks or networks, and ensure that these I/O operations are asynchronous
  • Business processes are streaming and require highly responsive, non-blocking operations

In addition, we can certainly use some responsive programming frameworks such as Rx to simplify the implementation of concurrent programming and data flow operations. RxJava, for example, provides very complete factory methods for converting non-responsive iterables, arrays, and futures and callables that are associated with responsive programming into Observables or flowables.

Understand the nature of Source

Akka Stream defines a Stream data Source as a Source, and RxJava defines an Observable or Flowable. Each of these reactive programming frameworks provides rich operators for Source. In addition to the combined flow operations, the most basic operations are filter, Map, flatMap, and reduce.

At a glance, these operations are functional programming interfaces, and from FP’s point of view, we can even think of Source as a MonAD. From a Java programming perspective, it’s easy to think of Source as a data structure equivalent to a collection. What’s more, responsive programming really grew out of the Observer and Iterator patterns, where an Observable or Flowable is a push model and an Iterator is a pull model.

However, this is the essential difference, that is, Source is a Source of continuously emitting events (data, error, complete) with the characteristics of a time series, while Iterable is a static data structure, the data stored in the data structure already exists when it is operated on.

Design the granularity of the Source properly

When demonstrating the API of Observable or Flowable, we often use Fluent Interface to continuously call its operator to form a whole flow processing process. This is not always justified. When a Source is concatenated by multiple operators, it makes the Source more difficult to reuse.

For example, when A web page is loaded, the default call to the back-end service is initiated and the required user information is returned. If it is modeled as flow A, its transformation is as follows:

uri ----> user ----> | -->

Copy the code

At the same time, a mouse click event will also initiate a call to the back-end service and return the required user information by randomly generating a URL. If modeled as flow B, its transformation is as follows:

click ----> uri ----> user ----> | -->

Copy the code

Obviously, the two flows overlap in stream processing from URI to user. If we create streams A and B that do not contain urI-to-user conversions, we can merge A and B through merge operations such as merge, and then reuse the URi-to-user conversions together. We also don’t have to worry about the cost of creating fine-grained flows because the creation of these flows is lazy, and while the flow is created, the operation of the flow is not executed immediately.

Separate the logic of the operation

Either reactive framework provides rich operators for streams (sources). Most of these operators support lambda expressions. Such an implementation is fine when dealing with simple business logic; However, once the logic becomes very complex, lambda expressions are not powerful enough. As a matter of programming practice, lambda expressions should be inherently small in granularity. At this point, you should separate the logic into separate classes and methods.

For example, we call the remote service according to the configuration information of the device to obtain the device information, and then extract the information to obtain the indicators required by the business, convert the indicators, and finally write the converted data into the database. Combined with the transformational nature of functions, we can split these operations into consecutive operations:

deviceConfig --> deviceInfo --> List<extractedInfo> --> transformedInfo --> write

Copy the code

If the logic of these transformations is very complex, the logic can be encapsulated into four classes DeviceFetcher, DeviceExtractor, DeviceTransformer, and DeviceWriter, respectively. The code can then be written as follows:

Flowable.fromIterable(deviceConfigs)

.parallel()

.runOn(Schedulers.computation())

.map(DeviceFetcher::fetch)

.flatMap(DeviceExtractor::extract)

.map(DeviceTransformer::transform)

.sequential()

.blockingSubscribe(

info -> DeviceWriter.write(info),

err -> log(err),

() -> log("done.")

);

Copy the code

This practice advocates separating the operation of a stream from the business of each operation to ensure the simplicity and purity of the flow operation and the reuse and extensibility of the operation business.

The design of the API

If we are going to design an API for responsive programming, we should ensure that every method is as non-blocking as possible. To do this, make sure that each method returns a type of Source or Publisher. For example, you can return Observable

or Flowable

for streams that return multiple data. If you are sure to return only one data, you can return Single

; If in doubt, return Maybe

. If the API method is simply a command that does not return a result, and you need to ensure that the method is non-blocking, consider returning Completable

.




In a sense, returning a Future

, CompletableFuture

, or CompletableStage

can also be considered reactive. These three types are purer because they are provided by the JDK itself. The only inconvenience is that these interfaces don’t provide as rich an operator as an Observable, but both Observables and Flowable provide a fromFuture() method to convert them, so this design is desirable.


Flow topology of Akka Stream

The abstractions of Akka Stream flow processing are modeled as diagrams. This design idea makes flow processing more intuitive and becomes a “building blocks” game. Unfortunately, Java’s DSL is so weak that if you compare Scala to Java, you’ll see that the GraphDSL’s representation of the Graph construction is completely different.

For example, here’s how Graph is constructed in the Java version of the official documentation:

RunnableGraph.fromGraph(GraphDSL.create(builder -> {

    final Outlet<Integer> A = builder.add(Source.single(0)).out();

    final UniformFanOutShape<Integer, Integer> B = builder.add(Broadcast.create(2));

    final UniformFanInShape<Integer, Integer> C = builder.add(Merge.create(2));

    final FlowShape<Integer, Integer> D = builder.add(Flow.of(Integer.class).map(i -> i + 1));

    final UniformFanOutShape<Integer, Integer> E = builder.add(Balance.create(2));

    final UniformFanInShape<Integer, Integer> F = builder.add(Merge.create(2));

    final Inlet<Integer> G = builder.add(Sink.<Integer>foreach(System.out::println)).in();



    builder.from(F).toFanIn(C); //feedback loop

    builder.from(A).viaFanOut(B).viaFanIn(C).toFanIn(F);

    builder.from(B).via(D).viaFanOut(E).toFanIn(F);

    builder.from(E).toInlet(G);



    return ClosedShape.getInstance();

})).run(mat);

Copy the code

Here’s how the Scala version constructs the same Graph in the official documentation:

RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>

val A: Outlet[Int] = builder.add(Source.single(0)).out

val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))

val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))

val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))

val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))

val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))

val G: Inlet[Any]



C <~ F

A ~> B ~> C ~> F

B ~> D ~> E ~> F

E ~> G

ClosedShape

})

Copy the code

We also saw that in the GraphDSL we could improve the expressiveness of the code to some extent if we could pre-create the “material” objects that make up the Graph and put the build work together.

We can think of the Graph of Akka Stream as a “mold” for Stream processing. As for the basic shapes made up of Inlet and Outlet ports, Is the “base material” that designs these molds.

The mold is static, the base materials and composite materials are reusable units, and then the combination of reusable business units (packaged in the form of functions, classes, or interfaces), the mold has business processing capabilities. If the topology is too complex, we can also use basic Shape combinations to form coarse-grained Partial ShaPs. These Partial shapes are not closed and can be understood as coarse-grained Source, Sink, and Flow, which makes mold assembly much easier. **

The relationship between materials, business units, and molds can be graphically represented by the following figure:

Once the mold for the stream is built, turn on the data stream faucet and let the data flow into the Graph, and the stream can run automatically. As long as the Source does not issue a complete or error signal, it will continue to run. Akka Stream calls Graph’s runner The Materializer because of this metaphor.

Using Akka Stream for reactive Stream processing, I suggest thinking along these lines.