Prior to Java7, it was difficult to process large amounts of data in parallel. First, you split the data into multiple parts, then put these sub-parts into each thread to perform the calculation logic, and finally, you combined the results returned by each thread. Java7 provides a fork/join framework for handling big data, which blocks out interaction between threads and focuses more on data processing.


Fork/Join framework

Fork/Join framework USES is thought is to divide and rule, the large tasks into smaller tasks, and then put into a separate thread, at the same time in order to maximize use of multi-core CPU, a kind of work stealing algorithm is used to run the task, that is to say, when a thread after handling their work tasks in the queue, Try to steal a task from another thread’s work queue and execute it until all tasks are processed. Therefore, in order to reduce the contention between threads, a double-ended queue is usually used. The stolen thread always takes the task to execute from the head of the double-ended queue, while the stolen thread always takes the task to execute from the tail of the double-ended queue. Found a map in Baidu

  • useRecursiveTask

To use the Fork/Join framework, you first need to create your own task, which inherits RecursiveTask and implements the abstract method

protected abstract V compute();
Copy the code

Implementation class needs to achieve the task in this method split, calculate, merge; Pseudocode can be expressed like this:

If (the task is already unsplit){return evaluates the results in order; } else {1. Divide task into two subtasks. 2. 4. Merge results of subtasks}Copy the code
  • Fork/Join in actual combat

Task: Complete the sum of 100 million natural numbers

We first use the serial way to achieve, the code is as follows:

long result = LongStream.rangeClosed(1, 100000000)
                .reduce(0, Long::sum);
System.out.println("result:" + result);
Copy the code

Use Fork/Join framework implementation, code as follows:

public class SumRecursiveTask extends RecursiveTask<Long> { private long[] numbers; private int start; private int end; public SumRecursiveTask(long[] numbers) { this.numbers = numbers; this.start = 0; this.end = numbers.length; } public SumRecursiveTask(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; If (length < 20000) {return sum(); } SumRecursiveTask leftTask = new SumRecursiveTask(numbers, start, start + length / 2); SumRecursiveTask rightTask = new SumRecursiveTask(numbers, start + (length / 2), end); // split tasks lefttask.fork (); // Put the subtask in the ForkJoinPoll thread pool to execute righttask.fork (); Return lefttask.join () + righttask.join (); return lefttask.join () + righttask.join (); Private long sum() {int sum = 0; private long sum() {int sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static void main(String[] args) { long[] numbers = LongStream.rangeClosed(1, 100000000).toArray(); Long result = new ForkJoinPool().invoke(new SumRecursiveTask(numbers)); System. The out. Println (" result: "+ result); }}Copy the code

The default number of threads for Fork/Join is the number of Processors you have, as given by Runtime.getruntime ().available-processors (). But you can through the system properties java.util.concurrent.ForkJoinPool.com mon. Parallelism to change the thread pool size, as shown below: System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”,”12″); This is a global setting, so it affects all parallel flows in your code. There is currently no way to specify this value for a parallel stream. Because all parallel streams are affected, avoid network /IO operations during the task, which may slow down other parallel streams


parallelStream

Java8 doesn’t stop there. It gives us a more convenient way to parallelStream. ParallelStream is implemented at the bottom through the Fork/Join framework.

  • Common usage

1. Serial flow into parallel flow

LongStream. RangeClosed (1100). The parallel (). The forEach (System. Out: : println);Copy the code

2. Generate parallel streams directly

 List<Integer> values = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            values.add(i);
        }
        values.parallelStream()
                .forEach(System.out::println);
Copy the code
  • ParallelStream is used correctly

ParallelStream is used to implement the above summation example. The code looks like this:

public static void main(String[] args) { Summer summer = new Summer(); LongStream.rangeClosed(1, 100000000) .parallel() .forEach(summer::add); System.out.println("result: "+ summer.sum); } static class Summer { public long sum = 0; public void add(long value) { sum += value; }}Copy the code

The running results are as follows:

After we run it, we find that the result of the run is incorrect, and the result of each run is different. Why is this? ParallelStream = sum += value = sum += value = sum += value = sum += value = sum += value = sum += value = sum += value = sum += value = sum += value = sum += value = sum += value So you should avoid modifying shared variables when using parallel streams.

Modify the above example to correctly use parallelStream as follows:

long result = LongStream.rangeClosed(1, 100000000)
        .parallel()
        .reduce(0, Long::sum);
System.out.println("result:" + result);
Copy the code

We have already said that the fork/join operation flow is: split, calculate, merge result; ParallelStream uses the fork/join framework at the bottom, so these steps are also required, but we can see from the above code that Long::sum does the calculation, reduce does the merge, and we don’t split tasks. So the process must be parallelStream already implemented for us, and at this point we must talk about Spliterator

Spliterator is a new interface added to Java8 that is designed to execute tasks in parallel.

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();
}
Copy the code

TryAdvance: Iterate over all elements, returning true if there are any left to iterate over, false otherwise

TrySplit: Splits all elements into smaller subparts and returns NULL if they cannot be split

EstimateSize: How many elements are left in the current split

Characteristics: Returns the encoding of the current Spliterator feature set


conclusion

  1. To prove that parallel processing is more efficient than sequential processing, it can only be proved by testing, not by guessing. (The cumulative example in this article has been run many times on multiple computers, and it does not prove that the parallel processing is much faster than serial processing. Therefore, it can only be tested by multiple tests, and the results may vary depending on the environment.)
  2. The amount of data is small and the calculation logic is simple, so parallel flows are generally not recommended
  3. The operational time consumption of the stream needs to be considered
  4. In some cases you need to implement the split logic yourself for parallel flows to be efficient

Thank you for your patience.

Of course, there may be more or less deficiencies and mistakes in the article, suggestions or opinions are welcome to comment and exchange. Finally, I hope friends can click on the like comment to follow the three, because these are all the power source I share 🙏