Discussion on the use and principle of Stream in JDK

Jdk7 upgrade to JDK8 added a very practical function, Stream, in the actual development has a lot of use. I believe many students are familiar with Stream. Why Stream is so popular and what problems it solves, let’s discuss together.

  • Author: Tao Wangwang
  • Editor: Higazawa

Why does a Stream occur

Before the Stream Stream, if we wanted to iterate over a collection, we might use forEach or a for in loop. If we needed to make some judgments during the iteration, we might need multiple loops, like this:

List<Integer> list = new ArrayList<>(Arrays.asList(3.5.2.9.1.6.8.7));
// Get all elements greater than 5 from the original list, sort, iterate
List<Integer> newList = new ArrayList<>();
for (Integer i : list) {
    if(i > 5){
        newList.add(i);
    }
}
Collections.sort(newList);
newList.forEach(System.out::println);
Copy the code

The above code may be written in the normal way without using a Stream Stream. The loop uses a total of three times, and the same function is implemented with a Stream Stream as follows:

List<Integer> list = new ArrayList<>(Arrays.asList(3.5.2.9.1.6.8.7));

list.stream().filter(integer -> integer>5).sorted().forEach(System.out::println);
Copy the code

Using a Stream plus lambda expression to do the same thing is a lot less code (and actually a lot less loop time), and the more operations you do, the more obvious the advantage of using a Stream becomes. Therefore, the emergence of Stream is mainly to simplify the operation of iteration and improve the efficiency of iteration.

Review the basic use of Stream

The Stream Stream provides very powerful functionality, so let’s review the common apis used in Stream streams.

Create a flow

Stream.of(1.2.3.4);

Arrays.asList(1.2.3.4).stream();
// Can replace the for I loop
IntStream.range(1.5);

// The following two types of streams, which I have not yet used, can generate streams of infinite length
Stream.generate(Math::random);
Stream.iterate(1, item -> item + 1);
Copy the code

The operation of the flow

The Stream Stream provides a rich set of operation types, as shown in the figure below

Here’s a quick explanation of the bold font in the table:

  • Intermediate operation: literally, it is a ring in the middle of the flow from the beginning to the end. From the operation result, the previous flow generates the next flow through the intermediate operation. From the code, it actually generates a stage and sink node

  • End operation: Contrary to the middle operation, after the end operation is the last link of the stream, no new stream is generated, and the whole sink node is started from scratch from the code

  • Stateful: An operation that requires fetching all the data in the stream, such as sort, before sorting

  • Stateless: Operations on individual elements that do not require all elements to be retrieved, such as filter, for each element itself

  • Short-circuit: Finalizing operations will return true as long as there is one element in the operation that yields the result of the operation, such as findAnyMatch, as long as one node satisfies the filter criteria

  • Non-short-circuit: you need to traverse all nodes to get the expected result, such as forEach and Max

Common operations

Here are some common flow operations that I use in my daily development

  • Filter: The filter method receives a Predicate function that determines whether the elements are filtered and returns true to continue to the next stream

  • Collect: The collect method can specify a container that is used to receive flow elements. The common ones are Collectors. ToList () and Collectors.

  • ForEach: The forEach method, similar to the for loop, is simple and practical, used for traversal

  • Sorted: The sorted method is used for sorting. You can pass in a custom comparator, the same as the collection comparator, or compare the size of the elements themselves

  • Map: The map method is used to map elements from the previous stream to the next stream, such as userstream.map (User::getName) (pseudocode) to map a User stream to a User name stream, which is very useful

  • FindAny: the findAny method is used to findAny element in the stream, which can be interpreted as whether there are any elements left in the stream after an intermediate operation

Explore the Stream principle

Source code analysis has always been a tedious and complex process, because highly engineered code such as the JDK source code uses a lot of design patterns and a high degree of interface encapsulation. We take the above code as an example to simply explore the implementation principle of a stream.

First look at the class inheritance diagram:

  • BaseStream specifies the basic interface for a stream

  • Stream defines common operations such as map, filter, and FlatMap

  • BaseStream Stream IntStream LongStream DoubleStream is the foundation of Java’s Stream architecture

  • PipelineHelper is used to construct ReferencePipeline and AbstractPipeline structures during Stream execution

  • AbstractPipeline is the core abstract class of the pipeline for building and managing the pipeline. Its implementation classes are the nodes of the pipeline

  • In the Head, StatelessOp, StatefulOp ReferencePipeline inner classes, [Int | Long | Double] Pipeline internal are also defines the three inner classes

Take ReferencePipeline as an example. From the inheritance diagram, Head, StatelessOp and StatefulOp internal classes in ReferencePipeline inherit ReferencePipeline. ReferencePipeline itself inherits AbstractPipeline, so Head, StatelessOp and StatefulOp are all abstractPipelines in essence. Unlike the Netty pipeline, each pipeline here and here is a node. So AbstractPipeline is a very important class. Click open the class as shown below, which has several important properties: SourceStage (sourceStage), previousStage (upstream Stage, previousStage), nextStage (downstream Stage, nextStage)

abstract class AbstractPipeline<E_IN.E_OUT.S extends BaseStream<E_OUT.S>>
        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT.S> {

    // point to the Head node
    @SuppressWarnings("rawtypes")
    private final AbstractPipeline sourceStage;

    / / upper Stage
    @SuppressWarnings("rawtypes")
    private final AbstractPipeline previousStage;

    protected final int sourceOrOpFlags;

	/ / the downstream stages
    @SuppressWarnings("rawtypes")
    private AbstractPipeline nextStage;
    
    // Define the abstract method, subclass implementation, return a Sink object
    abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink); . }Copy the code

In the source code, each Stage is described by Stage, and it is a typical two-way linked list structure. Thus the Stream process can be described in the following figure

In order to prove the correctness of the above, we point to open source step by step analysis

list.stream().filter(integer -> integer>5).sorted().forEach(System.out::println);
Copy the code

1. Generate a Head object

The first step is the stream method, which ends up calling the following code, which essentially generates a Head object after calling the stream method

public final class StreamSupport {
    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        / / make Head
        return newReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }}abstract class AbstractPipeline<E_IN.E_OUT.S extends BaseStream<E_OUT.S>>
        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT.S> { AbstractPipeline(Spliterator<? > source,int sourceFlags, boolean parallel) {
    // upstream Stage, Head upstream is null
    this.previousStage = null; 
    // Source splitter, which can be understood as an advanced version of an iterator
    this.sourceSpliterator = source; 
    // Head pointer to self
    this.sourceStage = this; 
    // Is the flag of an intermediate operation
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; 
    // The following is an optimization of:
    // The operation flags of the source and the combined source of all operations, including the operations represented by this pipe object. Effective in evaluating pipeline preparation.
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    // The intermediate operand between this pipe object and the stream source (if sequential), Head depth is 0
    this.depth = 0; 
    // True if the pipes are parallel, otherwise the pipes are sequential; Valid only for the source phase.
    this.parallel = parallel; }}Copy the code

2. Generate the operation object corresponding to filter

The second step is the filter method, which is called as follows, creating a stateless operation object, StatelessOp. The Head (this) object is passed in when the object is created, and the sourceStage (Head) is also recorded. At this point, the pointing relationship of the figure above is clear.

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    // This is the Head object, which will eventually be recorded as a previousStage by the current StatelessOp object, and will also point the nextStage of the Head object to StatelessOp
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                     // The implementation of filtering
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}


AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    // Point the nextStage pointer to Head to the current object, that is, the StatelessOp object that encapsulates filter
    previousStage.nextStage = this;
	// Point the previousStage of the StatelessOp object to the Head
    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    // Point the sourceStage of the StatelessOp object to Head
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}

Copy the code

The Sink interface is the protocol interface used to connect the current flow with the next flow. Each concrete subclass of AbstractPipeline implements the opWrapSink method to return a Sink instance.

interface Sink<T> extends Consumer<T> {
    
    // Start method, which can inform downstream to make relevant preparations
    default void begin(long size) {}

    // End the method to tell the downstream that all elements have been iterated over
    default void end(a) {}

    // Whether short-circuit operation is required
    default boolean cancellationRequested(a) {
        return false;
    }
	/ /...
    
    // The default implementation class of the Sink interface
    static abstract class ChainedReference<T.E_OUT> implements Sink<T> {
        // Downstream here means a downstream processing sink
        protected final Sink<? super E_OUT> downstream;

        public ChainedReference(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

        @Override
        By default, the downstream begin method is called directly
        public void begin(long size) {
            downstream.begin(size);
        }

        @Override
        // Calls the downstream end method directly by default
        public void end(a) {
            downstream.end();
        }

        @Override
        // Returns whether short-circuited
        public boolean cancellationRequested(a) {
            returndownstream.cancellationRequested(); }}}Copy the code

3. Generate sorted objects

In the third step, sorted method, sorted returns an instance of OfRef class, similar to filter, and also implements opWrapSink method. However, because sorted method implements sorting function, it determines the comparator. Finally, the collation logic is implemented by the comparator.

private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T.T> { OfRef(AbstractPipeline<? , T, ? > upstream) {// Similar to filter method, determine upstream and downstream stage relation
        super(upstream, StreamShape.REFERENCE,
              StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
        this.isNaturalSort = true;
        @SuppressWarnings("unchecked")
        // Default comparator
        Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
        this.comparator = comp;
    }
	/ /...
    @Override
    public Sink<T> opWrapSink(int flags, Sink<T> sink) {
        Objects.requireNonNull(sink);
        if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
            return sink;
        else if (StreamOpFlag.SIZED.isKnown(flags))
            return new SizedRefSortingSink<>(sink, comparator);
        else
            // Return RefSortingSink instance
            return new RefSortingSink<>(sink, comparator);
    }

    / /...
}

 private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
        private ArrayList<T> list;

        RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
            super(sink, comparator);
        }

        @Override
     	// Initializes a list of elements that will receive the upstream flow for sorting
        public void begin(long size) {
            if (size >= Nodes.MAX_ARRAY_SIZE)
                throw new IllegalArgumentException(Nodes.BAD_SIZE);
            list = (size >= 0)?new ArrayList<T>((int) size) : new ArrayList<T>();
        }

        @Override
     	// The actual sort operation
        public void end(a) {
            list.sort(comparator);
            downstream.begin(list.size());
            // Finalizing is not a short circuit operation, traversal
            if(! cancellationWasRequested) { list.forEach(downstream::accept); }// Finalizing is short-circuiting
            else {
                for (T t : list) {
                    if (downstream.cancellationRequested()) break; downstream.accept(t); }}/ / end
            downstream.end();
            list = null;
        }

        @Override
     	// Add elements to the initialized list
        public void accept(T t) { list.add(t); }}Copy the code

4. Turn the gears

So far, we have built our bidirectional linked list of operations, each of which is stored in a separate StatelessOp or StatefulOp object, but in the previous operation, our encapsulated Sink object was not actually called. This is why a Stream does not fire any intermediate operation before terminating. Everything is ready. East wind is the termination operation, and click on the forEach method in this example.

@Override
public void forEach(Consumer<? super P_OUT> action) {
    evaluate(ForEachOps.makeRef(action, false));
}

public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                                              boolean ordered) {
    Objects.requireNonNull(action);
    return new ForEachOp.OfRef<>(action, ordered);
}

OfRef(Consumer<? super T> consumer, boolean ordered) {
    super(ordered);// ForEachOp is the parent class, and the argument says whether the traversal is in order
    this.consumer = consumer;
}
Copy the code

This ForEachOps is the factory class where the user creates the TerminalOp instance. TerminalOp is the top-level interface for terminating operations. TerminalOp interface implementation classes include ForEachOp, ReduceOp, FindOp, MatchOp. The foreachops. makeRef method returns a ForEachOp object that encapsulates the forEach operation.

//ForEachOp implements TerminalSink, which is essentially Sink
static abstract class ForEachOp<T>
    implements TerminalOp<T.Void>, TerminalSink<T.Void> {
    private final boolean ordered;

    protected ForEachOp(boolean ordered) {
        this.ordered = ordered;
    }

    // TerminalOp

    @Override
    public int getOpFlags(a) {
        return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
    }

    @Override
    public <S> Void evaluateSequential(PipelineHelper
       
         helper, Spliterator
         spliterator)
        {
        return helper.wrapAndCopyInto(this, spliterator).get();
    }
    / /...
}

Copy the code

With the ForEachOp object in hand, we return to the Evaluate method, which flips the last ring of the gear. Here we use a serial flow, will eventually go to terminalOp. EvaluateSequential method

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape(a) == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
        // Serial stream, where this is the StatefulOp object returned by the sorted method
        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

@Override
public <S> Void evaluateSequential(PipelineHelper
       
         helper, Spliterator
         spliterator)
        {
    // Call the wrapAndCopyInto method of the StatefulOp object. AbstractPipeline is a subclass of PipelineHelper
    // this is forEachOp
    return helper.wrapAndCopyInto(this, spliterator).get();
}


@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    // Encapsulate the ForEachOp object as sink, ForEachOp
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}
// Encapsulate the sink operation to find the next node of Head, where is the StatelessOp node generated by filter
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        // Wrap Stage as Sink
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}
Copy the code

Finally, let’s look at copyInto methods. ForEach is a non-short-circuit operation

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);
	// Non-short-circuit operation
    if(! StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {// Call Sink's begin method wrapped earlier for pre-notification
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        //spliterator iterates the data sourceSpliterator Iterates the data source.// Call the previously wrapped Sink end method for post-notification
        wrappedSink.end();
    }
    // The short circuit operation will call the cancellationRequested method wrapped by Sink during traversal. If it returns, the subsequent operation will not be carried out
    else{ copyIntoWithCancel(wrappedSink, spliterator); }}@SuppressWarnings("unchecked")
@Override
// The Sink interface inherits the Consumer interface
public void forEachRemaining(Consumer<? super T> action) {
    Object[] a; int i, hi; // hoist accesses and checks from loop
    if (action == null)
        throw new NullPointerException();
    if ((a = array).length >= (hi = fence) &&
        //index starts at 0, hi is the set length
        (i = index) >= 0 && i < (index = hi)) {
        // Loop through the accept method
        do { action.accept((T)a[i]); } while(++i < hi); }}Copy the code

At this point, the truth is clear.

Think of a Stream in object-oriented terms

By analogy with real life, we can compare Stream to water flow. The intermediate operation is equivalent to the reservoir in the process of water flow, Sink is the operator of each reservoir, and the terminal operation is the commander who gives instructions. The operations in this example can be as follows:

conclusion

Stream Stream provides iterating operations for developers. Similar iterating operations also exist in JS and other languages. Mastering Stream Stream will effectively improve development efficiency. In this paper, the use and principle of Stream Stream has been the simplest analysis, more advanced functions are not involved, such as parallel flow will be applied to multithreading. Since the author has not yet involved in the operation of parallel flow, it is not mentioned in the article, and students who are interested can study it by themselves.

A Stream Stream is a good example of object orientation, which makes code more visual and alive. Experienced object-oriented language developers create works of art. Take the Stream Stream for example, and the responsibility chain programming is classic, such as the filter chain in the Spring framework or the processing chain in Netty. I hope we can continue to polish our own developed code and keep improving.

This article may contain errors and deficiencies, please correct.