Parallelism and concurrency

In terms of concurrency and parallelism, it is important to be clear that parallelism focuses on multiple tasks simultaneously, while concurrency uses scheduling to constantly switch between multiple tasks. In essence, multiple tasks are not executed simultaneously. Concurrent: Concurrent. The Word parallel is: parallel. If you want to get a sense of concurrency and parallelism, you can refer to the following image:





Parallelism and concurrency

Fork/Join framework and Java Stream API

Fork/Join frameworks are parallel frameworks. For more information about Fork/Join frameworks, see this article: Java Fork/Join Frameworks. In simple terms, Fork/Join framework can be a big task segmentation task for small enough, then put the small tasks assigned to different thread to execute, and through the work between threads to steal algorithm to coordinate resources, last night the task ahead of time the thread can go to “steal” other thread task haven’t finished the task, and every thread holds a deque, In the Fork/Join framework, in order to prevent competition between threads, threads consume tasks assigned to them from the queue head, while “steal” threads fetch tasks from the queue tail. The Fork/Join framework uses Fork methods to split large tasks, and uses Join to obtain the results of small tasks, which are then combined into the results of large tasks. For the Fork/Join task model, please refer to the following image:





Fork/Join task model

For more information about the Java Stream API, see this article: Java Streams API.

The Stream implementation uses the Fork/Join framework for concurrency, so we can make our programs run faster without even knowing it. The reason is that the Stream uses the Fork/Join framework for concurrency. You need to specify a parallel Stream to display, otherwise the Stream defaults to serial streams. For Collection, for example, you can use parallelStream to convert to a concurrent stream, or use the STREAM method to convert to a serial stream, and then use a parallel operation to convert the serial stream to a concurrent stream. The focus of this article is to examine how streams use Fork/Join concurrency.

Stream concurrency implementation details

After learning about the Fork/Join concurrency framework and Java Streams, the first question is: How do streams use the Fork/Join framework to do concurrency? In fact, it is good for users to know that Stream is done through the Fork/Join framework. However, if you want to learn more about the practice of Fork/Join framework and the design method of Java Stream, it is necessary to read the source code of the implementation. The analysis in the following is only my personal opinion.

It is important to note that Java Stream operations fall into two or three categories, as detailed in the Java Streams API article. A simple way to determine whether an operation is a Terminal operation or an Intermediate operation is if the operation returns a new Stream, then it is an Intermediate operation, otherwise it is a Terminal operation.

  • Intermediate: A stream can be followed by zero or more Intermediate operations. The main purpose is to open the stream, do some kind of data manipulation, and then return a new stream to be used by the next operation. These operations are lazy, meaning that they are called without actually starting the flow.

  • Terminal: A stream can have only one Terminal operation. After this operation is performed, the stream is used as light and cannot be operated again. So this must be the last operation of the stream. The execution of the Terminal operation will actually start the stream traversal and will produce a result, or side effect.

  • Another operation is called short-circuiting. Used for:

    • For an intermediate operation, if it accepts an infinite/unbounded Stream, but returns a finite new Stream.
    • For a terminal operation, if it accepts an infinite Stream but computes the result in finite time.

Java Stream implements concurrent Fork/Join operations on four types of Terminal operations, which are shown in the following figure:





Four Stream operations are supported in parallel

Let’s start by walking through the Stream execution path. Here’s what we want to do. We’ll use this code example to track the Stream execution path:

        Stream.of(1.2.3.4)
                .parallel()
                .map(n -> n*2)
                .collect(Collectors.toCollection(ArrayList::new));

Copy the code

To clarify, what the code above wants to do is make each of the four numbers (1,2,3,4) double itself, and then collect those elements and return them in an ArrayList. This is a very simple function. Here is the execution path of the operation flow above:


    step 1:
    
    public static<T> Stream<T> of(T... values) {
        return Arrays.stream(values);
    }
    
    step 2:
    
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); }}; }}; } step3:
    
        public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {... container = evaluate(ReduceOps.makeRef(collector)); . } step4:
    
        final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape(a) == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
    
    step 5: Uses the Fork/Join framework to perform operations.Copy the code

The above steps are omitted. Note that intermediate operations simply add the action to an upstream.


Construct a new Stream by appending a stateless intermediate operation to an existing stream.
Copy the code

For example, the map operation in our operation above actually adds the operation to an Intermediate chain and does not execute immediately. Focus on step 5, how Stream uses Fork/Join to achieve concurrency. Evaluate is important because it is handled separately, using Fork/Join for concurrent flows and serial operations for flows that do not have concurrency flags on.

The core method of the Fork/Join framework is a compute method. The following is an example of how the parallel version of a forEach operation is performed by a ForEachTask. The ForEachTask class implements the compute method:

// Similar to AbstractTask but doesn't need to track child tasks
        public void compute(a) {
            Spliterator<S> rightSplit = spliterator, leftSplit;
            long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
            if ((sizeThreshold = targetSize) == 0L)
                targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
            boolean forkRight = false;
            Sink<S> taskSink = sink;
            ForEachTask<S, T> task = this;
            while(! isShortCircuit || ! taskSink.cancellationRequested()) {if (sizeEstimate <= sizeThreshold ||
                    (leftSplit = rightSplit.trySplit()) == null) {
                    task.helper.copyInto(taskSink, rightSplit);
                    break;
                }
                ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
                task.addToPendingCount(1);
                ForEachTask<S, T> taskToFork;
                if (forkRight) {
                    forkRight = false;
                    rightSplit = leftSplit;
                    taskToFork = task;
                    task = leftTask;
                }
                else {
                    forkRight = true;
                    taskToFork = leftTask;
                }
                taskToFork.fork();
                sizeEstimate = rightSplit.estimateSize();
            }
            task.spliterator = null; task.propagateCompletion(); }}Copy the code

In the above code, the big tasks are broken down into smaller tasks. Where are the smaller tasks collected? Look at the following code:

        @Override
        public <S> Void evaluateParallel(PipelineHelper
        
          helper, Spliterator
          spliterator)
         {
            if (ordered)
                new ForEachOrderedTask<>(helper, spliterator, this).invoke();
            else
                new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
            return null;
        }
Copy the code

You can see that the Invoke method is called, with the following description of Invoke:

     * Commences performing this task, awaits its completion if
     * necessary, and returns its result, or throws an (unchecked)
     * {@code RuntimeException} or {@code Error} if the underlying
     * computation did so.
Copy the code

Fork/Join framework Why invoke a fork instead of a join? Here is a description of the join method:


     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     
Copy the code

According to the join description, we know that the get method can also be used to obtain the result, but that the GET method throws an exception and that neither join nor Invoke methods throw an exception. Instead, the Exception is reported to ForkJoinTask for ForkJoinTask to throw.