Have we learned how to use the Stream API

  • Lambda expressions and function interfaces.

  • A new addition to the Java Collections framework (Collections)

  • Basic usage of the Stream API

  • Stream protocol operation usage

It’s great to use, but the simplicity of the approach seems to hide endless secrets. How can such a powerful API be implemented? For example, how does Pipeline execute and does each method call result in an iteration? How does automatic parallelism work? How many threads? In this section, we will learn the principle of Stream pipelining, which is the key to Stream implementation.

First, let’s review how a container executes a Lambda expression, using the arrayList.foreach () method as an example:

We see that the main logic of the arrayList.foreach () method is an _for_ loop in which the action.accept() callback is constantly called to iterate over elements. This is nothing new at all; callback methods are widely used in Listeners for Java GUIs. Lambda expressions act as a callback method, which is easy to understand.

Lambda expressions are heavily used as callback methods in the Stream API, but that’s not the point. Understanding Stream we are more concerned with two other issues: pipelining and automatic parallelism. Using Stream, it might be easy to write code like this:

The code above finds the maximum length of a string beginning with the letter _A_. A straightforward way to do this is to perform an iteration for each function call, which is functional but certainly inefficient. The implementation of the class library avoids multiple iterations by pipelining.the basic idea is to perform as many user-specified operations as possible in one iteration. We have summarized all the operations on Stream for the sake of this explanation.

All operations on a Stream fall into two categories: intermediate operations, which are just flags, and end operations, which trigger the actual calculation. Stateless and Stateful intermediate operations were also divided into Stateless and Stateful operations. Stateless intermediate operations meant that the processing of elements was not affected by previous elements, while Stateful intermediate operations had to wait until all elements were processed to know the final result, for example, sorting was a Stateful operation. The sorting result cannot be determined until all elements are read; The end operation can be divided into short-circuit operation and non-short-circuit operation. Short-circuit operation means that the result can be returned without processing all elements, such as _ finding the first element that meets the condition _. The reason for this fine division is that each case is treated differently at the bottom. To better understand the intermediate and terminal operations of a flow, look at their execution in the following two pieces of code.

The output is: A1B1C1 A2B2C2 A3B3C3 The intermediate operation is lazy, that is, the intermediate operation does nothing with the data until the final operation is encountered. And the final operation, are more enthusiastic. They’re going to go back and do all the intermediate things. That is, when the last forEach operation is executed, it will go back to its previous intermediate operation, which will go back to its previous intermediate operation… Until the very first step. The first time forEach is executed, the peek operation will be backtracked, and the peek operation will be backtracked to the previous limit operation, and the limit operation will be backtracked to the previous peek operation. A1B1C1 on the second forEach run, the peek operation will be backtracked, and the PEEK operation will be backtracked to the previous limit operation, and the limit operation will be backtracked to the previous peek operation. There is no operation at the top, and the execution will start from the top down, with the output: A2B2C2

. When forEach is executed for the fourth time, the peek operation will be retraced, and the peek operation will be retraced to the previous limit operation. When it reaches the limit, it will find that limit(3) has completed, which is equivalent to the break operation in the loop, and will terminate the loop.

Let’s look at the second code:

The output is: A1 A2 A3 A4 A5 A6 A7B7C7 A8B8C8 A9B9C9 When forEach is executed for the first time, peek will backtrack, and then PEEK will backtrack to skip operation of the previous step. Skip will backtrack to peek operation of the previous step, and there is no operation on the top layer. Start from the top down, and skip, because skip, means skip, don’t do anything else, which is equivalent to the continue in the loop, end the loop. Output: A1

When forEach is executed for the second time, peek operation will be backtracked, and then PEEK will backtrack to skip operation of the previous step. Skip will backtrack to PEEK operation of the previous step, and skip will backtrack to peek operation of the previous step. There is no operation on the top layer, and it starts to execute from top to bottom. Output: A2

.

When forEach is executed for the seventh time, peek will be backtracked, and then PEEK will backtrack to skip operation of the previous step. Skip will backtrack to PEEK operation of the previous step, and skip will backtrack to peek operation of the previous step. There is no operation on the top, and it starts to execute from top to bottom. It’s done the skip(6) job. Skip will skip this time and continue with the following operations. Output: A7B7C7

. Until the loop ends.

A straightforward way to do it

Still considering the longest string program described above, a straightforward pipelined implementation would take an iteration for each function call and put the intermediate results of the processing into some kind of data structure (such as arrays, containers, etc.). Specifically, the filter() method is immediately executed, all strings starting with _A_ are selected and placed in a list list1, which is then passed to mapToInt() and executed immediately, the resulting result is placed in List2, and the final result is traversed through List2 to find the largest number. The execution process of the program is as follows:

This is pretty straightforward to implement, but there are two obvious downsides:

  1. The number of iterations is large. The number of iterations is equal to the number of function calls.

  2. Intermediate results are frequently produced. Each function call produces an intermediate result with unacceptable storage overhead.

These drawbacks make it unacceptably inefficient. Without using the Stream API we all know how the above code would be done in an iteration, which looks something like this:

In this way we not only reduce the number of iterations but also avoid storing intermediate results, which is clearly pipelining because we put three operations in one iteration. We can always implement the equivalent of the Stream API in this way if we know the user’s intentions in advance, but the problem is that the designers of the Stream library don’t know what the user’s intentions are. How to implement pipelining without assuming user behavior is a problem that library designers should consider.

Stream pipeline solution

We can generally imagine that there should be some way to record each step of the user’s actions, and when the user calls the end of the action, the previously recorded actions are superimposed together and all executed in one iteration. Along this line, several issues need to be addressed:

  1. How to record user operations?

  2. How do operations stack up?

  3. How to perform the operation after stacking? (The follow-up analysis focuses on the problem points)

  4. Where are the results (if any) of the execution? (The follow-up analysis focuses on the problem points)

How to record operations

Note that the term “operation” is used to refer to “Stream intermediate operations”. Many Stream operations require a callback function (Lambda expression), so a complete operation is a triplet of < source, operation, callback function >. A Stream uses the concept of stages to describe a complete operation and is represented by an instantiated PipelineHelper, which connects the sequential stages together to form the pipeline. A graphical representation of Stream related classes and interfaces.

IntPipeline, LongPipeline and DoublePipeline are not drawn in the diagram. These three classes are specifically customized for the three basic types (not the packaging type), and are in parallel with ReferencePipeline. Head is used to represent the first Stage, the Stage generated by calling methods such as collection.stream (), which obviously contains no operations. StatelessOp and StatefulOp represent stateless and stateful stages, respectively, corresponding to stateless and stateful intermediate operations.

The schematic diagram of Stream pipeline organization is as follows:

The Head (stage0) is obtained by the collection.stream () method, followed by a series of intermediate operations that generate new streams. These Stream objects are organized together in the form of a bidirectional linked list to form the entire pipeline. Since each Stage records the operations of the previous Stage and the current Stage as well as the callback function, all operations on the data source can be established based on this structure. This is how Stream records operate.

How operations stack

The above is only to solve the problem of operation record. In order to make the assembly line play its due role, we need a solution to stack all the operations together. You might think it would be easy to just start with the pipeline head and perform each step (including the callback function) in sequence. That sounds like it might work, but you ignore the previous stages and have no idea what the latter stages actually do or what the callback is. In other words, only the current Stage itself knows how to perform the actions it contains. This requires some kind of protocol to coordinate calls between adjacent stages.

This protocol is completed by the Sink interface. The methods of the Sink interface are shown in the following table:

With the above protocol, it is convenient to call adjacent stages. Each Stage encapsulates its operation into a Sink. The former Stage only needs to call the accept() method of the latter Stage, without knowing how it is processed internally. Of course, Sink’s begin() and end() methods must also be implemented for stateful operations. For example, stream.sorted () is a stateful intermediate operation. The sink.begin () method might create a container for results, accept() adds elements to the container, and end() sorts the container. For short circuit operation, Sink. CancellationRequested () also must be implemented, such as Stream. FindFirst () is a short circuit operation, as long as find an element, cancellationRequested () should return _true_, So that the caller can end the search as soon as possible. Sink’s four interface methods often work together to accomplish computational tasks. In fact, the essence of the Stream API’s internal implementation is how to override Sink’s four interface methods.

With Sink wrapping the action, the problem of calling between stages is solved, During execution, only the Sink.{begin(), accept(), cancellationRequested(), end()} methods corresponding to each Stage are called from the head of the pipeline to the data source successively. A possible sink.accept () method flow would look like this:

Several other methods of the Sink interface are also implemented according to this [process -> forward] model. Let’s take a concrete example to see how the intermediate operation of Stream wraps its own operation as Sink and how Sink forwards the processing result to the next Sink. First look at the stream.map () method:

The above code seems complicated, but in fact the logic is very simple, which is to wrap the callback function _mapper_ into a Sink. Since stream.map () is a stateless intermediate operation, the map() method returns a StatelessOp inner class object (a new Stream), and calling this new Stream’s opWripSink() method gets a Sink wrapped around the current callback function.

Let’s do a slightly more complicated example. The stream.sorted () method sorts the elements in the Stream, which is obviously a stateful intermediate operation, because you can’t get the final order until you’ve read all the elements. Regardless of the template code, how does the sorted() method encapsulate operations as sinks? Sorted () one possible encapsulation of the Sink code is as follows:

The above code perfectly illustrates how Sink’s four interface methods work together:

  1. First, begin() method tells Sink the number of elements involved in sorting, which is convenient to determine the size of the intermediate result container.

  2. Elements are then added to the intermediate result through the accept() method, which the caller calls until all elements are iterated;

  3. Finally, the end() method tells Sink that all elements are traversed and the sorting step is started. After the sorting is completed, the result will be transmitted to the downstream Sink.

  4. If the downstream Sink is short-circuited, the downstream cancellationRequested() is continuously asked if it can end the processing when the result is passed to the downstream.

conclusion

This article describes the organization of the Stream pipeline in detail, and will continue to describe the Stream pipeline execution process in detail. Learning this article will help you understand the principles and write the right Stream code, while dispelled any concerns you may have about the efficiency of the Stream API. As you can see, the Stream API is so cleverly implemented that even if we wrote the equivalent code manually using external iterations, it wouldn’t necessarily be more efficient.