preface

Java 8’s Stream makes code simpler and easier to understand. This article takes an in-depth look at how Java Stream works and discusses Steam’s performance issues.


Streams in Java 8 collections are equivalent to an advanced version of Iterator, which can perform all sorts of very convenient and efficient Aggregate operations on collections using Lambda expressions. Or Bulk Data operations.

The aggregation of Stream is similar to that of database SQL: sorted, Filter, map, and so on. In the application layer, we can efficiently realize the aggregation operation similar to database SQL. In terms of data operation, Stream can not only realize the data operation in serial way, but also process large quantities of data in parallel way, improving the efficiency of data processing.

Operating classification

Operations in Stream are officially classified into two categories:

  • Intermediate operations, only operations are logged, that is, only a stream is returned, and no calculation is performed.
  • Terminal Operations, realized the calculation operation.

Intermediate operations can be divided into:

  • Stateless operationThe processing of the element is unaffected by the previous element.
  • Stateful operation, meaning that the operation cannot continue until all elements are retrieved.

Terminating operations can be divided into:

  • Short-circuitingOperation, when certain elements that meet the conditions are encountered and the final result is obtained
  • Unshort-circuitingOperation, which means that all elements must be processed to get the final result.

The operation classification details are as follows:

Source structure

The inheritance relationship between Stream related classes and interfaces is shown in the following figure:

BaseStream

The top interface class defines the basic interface methods of a stream. The main methods are Spliterator and isParallel.

Stream

The top interface class. Defines common methods for streams, such as map, filter, sorted, limit, Skip, collect, and so on.

ReferencePipeline

ReferencePipeline is a structural class that defines the internal class to assemble various operation flows, defines three internal classes Head, StatelessOp and StatefulOp, and implements the interface methods of BaseStream and Stream.

Sink

The Sink interface defines the operation behavior between streams, including begin(), end(), cancellationRequested(), accpt(). ReferencePipeline will eventually assemble the whole Stream operation into a call chain, and the up-down relationship of each Stream operation on this call chain is defined and realized through the Sink interface protocol.

Stack operation

The basic usage of Stream will not be described, but we will start with a piece of code to analyze how Stream works.

@Test
public void testStream(a) {
    List<String> names = Arrays.asList("kotlin"."java"."go");
    int maxLength = names.stream().filter(name -> name.length() <= 4).map(String::length)
            .max(Comparator.naturalOrder()).orElse(-1);
    System.out.println(maxLength);
}
Copy the code

When using Stream, there are three main components, which are explained below.

Loading a data source

Calling names.stream() will load the ReferencePipeline Head object for the first time, which will load the data source.

java.util.Collection#stream

default Stream<E> stream(a) {
    return StreamSupport.stream(spliterator(), false);
}
Copy the code

The Stream method in StreamSupport initializes a ReferencePipeline Head inner class object.

java.util.stream.StreamSupport#stream(java.util.Spliterator, boolean)

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}
Copy the code

In the middle of operation

Filter (name -> name.length() <= 4).mapToint (String::length) Instead of executing the stages, AbstractPipeline generates a linked list of intermediate operation stages.

java.util.stream.ReferencePipeline#filter

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if(predicate.test(u)) downstream.accept(u); }}; }}; }Copy the code

java.util.stream.ReferencePipeline#map

@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); }}; }}; }Copy the code

You can see that both the Filter and map methods return a new StatelessOp object. New StatelessOp calls the constructor of the parent class AbstractPipeline, which connects the preceding and subsequent stages to generate a linked list of stages:

AbstractPipeline(AbstractPipeline<? , E_IN, ? > previousStage,int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    previousStage.nextStage = this;

    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}
Copy the code

Put an end to the operation

Max (comparator.naturalOrder ()) is the final operation, which generates a final Stage. Through this Stage, the previous intermediate operation is triggered. Starting from the last Stage, a Sink chain is generated recursively.

java.util.stream.ReferencePipeline#max

@Override
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
    return reduce(BinaryOperator.maxBy(comparator));
}
Copy the code

Final call to Java. The util. Stream. AbstractPipeline# wrapSink, this method will be called opWrapSink generated a Sink list, corresponding to the examples in this article, is to filter and map operations.

@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}
Copy the code

Breakpoint debugging on opWrapSink above shows that the filter and map operations in this example are eventually called.

After wrapAndCopyInto generates the Sink list, it will execute the specific operation of the Sink list through copyInfo method.

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if(! StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); }else{ copyIntoWithCancel(wrappedSink, spliterator); }}Copy the code

The core code above is:

spliterator.forEachRemaining(wrappedSink);
Copy the code

java.util.Spliterators.ArraySpliterator#forEachRemaining

@Override
public void forEachRemaining(Consumer<? super T> action) {
    Object[] a; int i, hi; // hoist accesses and checks from loop
    if (action == null)
        throw new NullPointerException();
    if ((a = array).length >= (hi = fence) &&
        (i = index) >= 0 && i < (index = hi)) {
        do { action.accept((T)a[i]); } while(++i < hi); }}Copy the code

The accept method is the first element in the list “kotlin” (the three elements in the code are “kotlin”, “Java “, “go”). The filter is passed in as a Lambda expression:

filter(name -> name.length() <= 4)
Copy the code

Obviously, the first element, “Kotlin,” predicate doesn’t enter.

For the second element, “Java,” predicate. Test returns true (the string “Java” is <=4 in length) and goes to the Map accept method.

When the Accept method is called, empty is false, and the result after the map (4 of int type) is assigned to T.

public static <T> TerminalOp<T, Optional<T>>
makeRef(BinaryOperator<T> operator) {
    Objects.requireNonNull(operator);
    class ReducingSink
            implements AccumulatingSink<T.Optional<T>, ReducingSink> {
        private boolean empty;
        private T state;

        public void begin(long size) {
            empty = true;
            state = null;
        }

        @Override
        public void accept(T t) {
            if (empty) {
                empty = false;
                state = t;
            } else{ state = operator.apply(state, t); }}... }}Copy the code

For the third element “go”, the accept method is also entered, where empty is true, and the map result (2 of int type) is compared with the last result (4) by a custom comparator, and the value that matches the result is saved.

public static <T> BinaryOperator<T> maxBy(Comparator<? super T> comparator) {
    Objects.requireNonNull(comparator);
    return (a, b) -> comparator.compare(a, b) >= 0 ? a : b;
}
Copy the code

The comparator passed in for Max in this article’s code is:

max(Comparator.naturalOrder())
Copy the code

This will return 4 of type int.

Parallel processing

The above example is serialized. If you want to change it to parallel, simply add parallel() to the stream() method. Parallel code can be written as:

@Test
public void testStream(a) {
    List<String> names = Arrays.asList("kotlin"."java"."go");
    int maxLength = names.stream().parallel().filter(name -> name.length() <= 4)
            .map(String::length).max(Comparator.naturalOrder()).orElse(-1);
    System.out.println(maxLength);
}
Copy the code

Parallel processing of a Stream is implemented in the same way as serial processing until the finalization operation is performed. After calling the finalizing method, the implementation is a little different, calling TerminalOp’s evaluateParallel method for parallel processing.

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape(a) == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
            ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
            : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
Copy the code

The core is the use of the ForkJoin framework to fragment the Stream processing, which eventually invokes the following code without further analysis.

java.util.stream.AbstractTask#compute

@Override
public void compute(a) {
    Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
    long sizeEstimate = rs.estimateSize();
    long sizeThreshold = getTargetSize(sizeEstimate);
    boolean forkRight = false;
    @SuppressWarnings("unchecked") K task = (K) this;
    while(sizeEstimate > sizeThreshold && (ls = rs.trySplit()) ! =null) {
        K leftChild, rightChild, taskToFork;
        task.leftChild  = leftChild = task.makeChild(ls);
        task.rightChild = rightChild = task.makeChild(rs);
        task.setPendingCount(1);
        if (forkRight) {
            forkRight = false;
            rs = ls;
            task = leftChild;
            taskToFork = rightChild;
        }
        else {
            forkRight = true;
            task = rightChild;
            taskToFork = leftChild;
        }
        taskToFork.fork();
        sizeEstimate = rs.estimateSize();
    }
    task.setLocalResult(task.doLeaf());
    task.tryComplete();
}
Copy the code

The use of parallel errors

@Test
public void testParallelWrong(a) {
    List<Integer> parallelList = new ArrayList<>();
    IntStream.range(0.1000).boxed().parallel().filter(i -> i % 2= =1)
            .forEach(parallelList::add);
    System.out.println(parallelList.size());
}
Copy the code

The above output will often be less than 500 because parallelList is of type ArrayList, which is not thread-safe, and may overwrite the assigned value of another thread when it executes add at the time of expansion or when the thread is occupied.

Parallel the correct way to use

@Test
public void testParallelRight(a) {
    List<Integer> parallelList = IntStream.range(0.1000).boxed().parallel()
            .filter(i -> i % 2= =1).collect(Collectors.toList());
    System.out.println(parallelList.size());
}
Copy the code

performance

The following article is from: JavaLambdaInternals/ 8-stream performance. md, import delete.

In order to ensure the reliability of the test results, we run the JVM in -server mode, test data is at the level of GB, test machine adopts common commercial server, configuration is as follows:

OS CentOS 6.7 x86_64
CPU Intel Xeon X5675, 12M Cache 3.06 GHz, 6 Cores 12 Threads
memory 96GB
JDK Java Version 1.8.0_91, Java HotSpot(TM) 64-bit Server VM

The code used for the tests is here, and the test results are summarized.

Test methods and test data

Performance testing is not easy, and Java performance testing is even more difficult because the virtual machine has a significant impact on performance, and the JVM has two effects on performance:

  1. Impact of GC. GC behavior is a difficult part of Java to control, and to add certainty, we manually specified the CMS collector and 10GB of fixed-size heap memory. Specifically, the JVM parameters are-XX:+UseConcMarkSweepGC -Xms10G -Xmx10G
  2. Just-in-time (JIT) compilation technology. The just-in-time compilation technique compiles hot code into native code while the JVM is running. During testing, we warm up the program and trigger just-in-time compilation of the test function. The relevant JVM parameters are-XX:CompileThreshold=10000.

The Stream is executed in parallel using the ForkJoinPool.commonPool() thread pool. To control parallelism we use the Linux taskset command to specify the number of cores available to the JVM.

The test data is randomly generated by the program. In order to prevent jitter caused by one test, the average time of four tests was calculated as the running time.

Experiment 1 basic type iteration

Test: Find the minimum value in an integer array. Compare the external iteration performance of the for loop with the internal iteration performance of the Stream API.

From the test program IntTest, the results are shown below:

The figure shows the baseline time ratio of the external iteration time of the for loop. Analysis is as follows:

  1. The performance overhead of the serial iteration is significantly higher (twice) than that of the external iteration for the primitive type Stream.
  2. Stream parallel iterations perform better than both serial and external iterations.

The parallel iteration performance is related to the number of available cores. All 12 cores are used in the parallel iteration in the figure above. In order to investigate the impact of using cores on performance, we specially test the parallel iteration effect of Stream with different cores:

Analysis, for basic types:

  1. Using the Stream parallel API performs poorly in single-core scenarios, worse than the Stream serial API;
  2. As the number of cores used increases, Stream parallelism becomes progressively better than external iteration using the for loop.

The above two tests show that the performance of Stream serial iteration is worse for simple iteration of basic type, but the performance of Stream iteration is better in multi-core case.

Experiment 2 object iteration

Let’s look at the iterating effect of the object.

Test: Find the smallest element in the list of strings (in natural order) and compare the performance of iterating outside the for loop to iterating inside the Stream API.

Test program StringTest, the test result is shown below:

The results are analyzed as follows:

  1. The performance overhead of the serial iteration for the object type Stream is still higher than that of the external iteration (1.5 times), but the difference is not as large as for the base type.
  2. Stream parallel iterations perform better than both serial and external iterations.

Let’s separately examine the Stream parallel iteration effect:

Analysis, for object types:

  1. Using the Stream parallel API performs worse than external iteration of the for loop in single-core cases;
  2. With the increase of the number of cores used, the parallel effect of Stream gradually becomes better, and the effect brought by multiple cores is obvious.

The above two tests show that Stream serial iteration performance is worse for simple iterations of object types, but Stream iteration performance is better for multiple cores.

Experiment 3 Complex object reduction

According to the results of experiment 1 and experiment 2, the serial execution of Stream is much worse than the external iteration. Before we jump to conclusions, let’s look at more complicated operations.

Test content: given the order list, statistics of the total transaction volume of each user. Compare the performance of a manual implementation using external iteration with the Stream API.

We simplify the Order to a tuple of

and represent it with an Order object. The test program ReductionTest, and the test results are shown as follows:
,>

Analysis, for complex reduction operations:

  1. The Stream API generally performs better than external manual iteration, and parallel streams perform better.

Then we examine the influence of parallelism on parallelism, and the test results are as follows:

Analysis, for complex reduction operations:

  1. The performance of Stream parallel reduction is worse than that of serial reduction and manual reduction in single-core case, which is simply the worst.
  2. With the increase of the number of cores used, the parallel effect of Stream gradually becomes better, and the effect brought by multiple cores is obvious.

The above two experiments show that for complex reduction operations, Stream serial reduction is better than manual reduction, and parallel reduction is better in multi-core cases. It is reasonable to expect similar performance from the Stream API for other complex operations.

conclusion

The results of the above three experiments can be summarized as follows:

  1. For simple operations, such as the simplest traversal, the Stream serial API performs significantly worse than display iteration, but the parallel Stream API can take advantage of the multicore nature.
  2. For complex operations, the Stream serial API performs as well as the manual implementation, and far better when executed in parallel.

Therefore, for performance reasons, 1. Manual implementation using external iteration is recommended for simple operations, 2. For complex operations, the Stream API is recommended, 3. In multi-core cases, the parallel Stream API is recommended to take advantage of multi-core, 4. It is not recommended to use the parallel Stream API in single-core scenarios.

The public,

In addition, future articles will also be published in the public website (Coding Insight)

Code and mind maps are in the GitHub project. Welcome star!

Refer to the article

  1. JavaLambdaInternals/6-Stream Pipelines.md
  2. JavaLambdaInternals/8-Stream Performance.md
  3. How to Improve traversal Collection Efficiency?