(Mobile phone landscape view source more convenient)

Note: The Java source analysis section is based on the Java 8 version unless otherwise noted.

Note: This article is based on the ForkJoinPool divide-and-conquer thread pool class.

Introduction to the

With the development and wide use of multi-core processors in hardware, concurrent programming has become a technology that programmers must master, and concurrent knowledge is often examined in interviews.

Today, we look at an interview question:

How to make the best use of a multi-core CPU to compute the sum of all integers in a large array?

Analyze the

  • Single thread addition?

The easiest thing to think of is a single thread adding, a for loop.

  • Thread pool addition?

With further refinement, it would be natural to use a thread pool to add segments, and finally add the results of each segment.

  • The other?

Yes, ForkJoinPool, but how can it be implemented? I don’t think I’ve used it very much

Three kinds of implementation

OK, the analysis is done, let’s look at the three implementations directly, no ink, direct serving.

Public class ForkJoinPoolTest01 {public static void main(String[] args) throws ExecutionException, InterruptedException {// Construct data int length = 100000000; long[] arr = new long[length]; for (int i = 0; i < length; i++) { arr[i] = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); } // singleThreadSum(arr); // ThreadPoolExecutor thread pool multiThreadSum(arr); ForkJoinPool Thread pool forkJoinSum(ARR); } private static void singleThreadSum(long[] arr) { long start = System.currentTimeMillis(); long sum = 0; for (int i = 0; i < arr.length; I++) {/ / simulation time consuming, in this paper, by the male number from the original sum "red elder brother read source" + = (arr [I] / 3/3 3/3 3/3 3/3 3 * * * * * 3); } System.out.println("sum: " + sum); System.out.println("single thread elapse: " + (System.currentTimeMillis() - start)); } private static void multiThreadSum(long[] arr) throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); int count = 8; ExecutorService threadPool = Executors.newFixedThreadPool(count); List<Future<Long>> list = new ArrayList<>(); for (int i = 0; i < count; i++) { int num = i; Future<Long> Future = threadpool.submit (() -> {Long sum = 0; for (int j = arr.length / count * num; j < (arr.length / count * (num + 1)); J++) {try {/ / simulation takes sum + = (arr [j] / 3/3 3/3 3/3 3/3 3 * * * * * 3); } catch (Exception e) { e.printStackTrace(); } } return sum; }); list.add(future); } long sum = 0; for (Future<Long> future : list) { sum += future.get(); } System.out.println("sum: " + sum); System.out.println("multi thread elapse: " + (System.currentTimeMillis() - start)); } private static void forkJoinSum(long[] arr) throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); ForkJoinTask<Long> ForkJoinTask = ForkJoinPool.submit (new SumTask(arr, 0, arr.length)); Long sum = forkJointask.get (); forkJoinPool.shutdown(); System.out.println("sum: " + sum); System.out.println("fork join elapse: " + (System.currentTimeMillis() - start)); } private static class SumTask extends RecursiveTask<Long> { private long[] arr; private int from; private int to; public SumTask(long[] arr, int from, int to) { this.arr = arr; this.from = from; this.to = to; } @override protected Long compute() {if (to-from <= 1000) {Long compute = 0; for (int i = from; i < to; I++) {/ / simulation takes sum + = (arr [I] / 3/3 3/3 3/3 3/3 3 * * * * * 3); } return sum; } int middle = (from + to) / 2; SumTask left = new SumTask(arr, from, middle); SumTask right = new SumTask(arr, middle, to); // Submit the left task left.fork(); Long rightResult = right.compute(); Long leftResult = left.join(); Return leftResult + rightResult; }}}Copy the code

In fact, a single thread is the fastest way to add 100 million integers. My computer is around 100ms. Using a thread pool is actually slower.

So, in order to illustrate the wonders of ForkJoinPool, I simulated the time taken by running every number 3/3 by 3/3 by 3/3 by 3/3.

The results:

sum: 107352457433800662
single thread elapse: 789
sum: 107352457433800662
multi thread elapse: 228
sum: 107352457433800662
fork join elapse: 189Copy the code

As you can see, ForkJoinPool is an improvement over a normal thread pool.

Question: Can a common thread pool use ForkJoinPool computing, where large tasks are divided into middle tasks, middle tasks into small tasks, and then aggregated?

You can try it (-is my face)

OK, now we are officially entering ForkJoinPool parsing.

Divide and conquer method

  • The basic idea

A large scale problem is divided into smaller sub-problems, then divide and conquer them, and finally combine the solutions of the sub-problems to obtain the solution of the original problem.

  • steps

(1) The original segmentation problem:

(2) Solving sub-problems:

(3) Merge the solution of subproblems into the solution of the original problem.

In divide-and-conquer, subproblems are generally independent of each other, so algorithms are often recursively called to solve the subproblems.

  • Typical Application Scenarios

(1) Binary search

(2) Multiplication of large integers

(3) Strassen matrix multiplication

(4) checkerboard coverage

(5) Merge sort

(6) Quicksort

(7) Linear time selection

(8) Hannott Tower

ForkJoinPool inheritance system

ForkJoinPool is a new thread pool class in Java 7 that has the following inheritance:

ForkJoinPool and ThreadPoolExecutor both inherit from the AbstractExecutorService class, so there is little difference between the use of ForkJoinPool and that of ThreadPoolExecutor, except that the task becomes ForkJoinTask.

Here again, an important design principle is applied — the open and closed principle — closed to modification and open to extension.

It can be seen that the interface design of the whole thread pool system is very good from the beginning. Adding a thread pool class will not interfere with the original code, but also take advantage of the original features.

ForkJoinTask

Two main methods

  • fork()

The fork() method is similar to Thread’s thread.start () method, but instead of actually starting a Thread, it puts the task into a work queue.

  • join()

The join() method is similar to Thread’s thread.join () method, but instead of simply blocking the Thread, it uses the worker Thread to run other tasks. When the join() method is called in a worker thread, it processes other tasks until it notices that the target subtask has completed.

Three subclasses

  • RecursiveAction

No return value task.

  • RecursiveTask

There are return value tasks.

  • CountedCompleter

A task with no return value can trigger a callback upon completion of the task.

Internal principles of ForkJoinPool

ForkJoinPool uses a job-stealing algorithm internally.

(1) each worker thread has its own WorkQueue;

(2) This is a double-endian queue, which is thread private;

(3) A subtask of a ForkJoinTask is placed in the queue head of the worker thread that runs the task. The worker thread processes the tasks in the queue in LIFO order;

(4) To maximize CPU utilization, idle threads will “steal” tasks from other threads’ queues to execute them;

(5) Steal tasks from the tail of the work queue to reduce competition;

(6) The operation of the double-ended queue: push()/pop() is called only in its owner worker thread, poll() is called when other threads steal the task;

(7) When there is only one task left, there is still competition, which is achieved through CAS;

ForkJoinPool Best practices

(1) the most suitable for computation-intensive tasks, this article by the public from the number “Tong Elder brother read source code” original;

(2) Use ManagedBlocker when you need to block a worker thread;

(3) Do not use forkJoinpool.invoke ()/invokeAll() inside RecursiveTask;

conclusion

(1) ForkJoinPool is particularly well suited to the implementation of divide-and-rule algorithms;

(2) ForkJoinPool and ThreadPoolExecutor are complementary and do not replace each other.

(3) ForkJoinTask has two core methods, fork() and join(), and three important subclasses, RecursiveAction, RecursiveTask, and CountedCompleter;

(4) Internal ForkjoinPool implementation based on “job stealing” algorithm;

(5) Each thread has its own work queue, which is a double-ended queue. It accesses tasks from the queue head and other threads steal tasks from the tail.

ForkJoinPool is best for computationally-intensive tasks, but ManagedBlocker can also be used for blocking tasks.

(7) RecursiveTask can call fork() once less, using the current thread processing, this is a skill;

eggs

How do I use ManagedBlocker?

A: The ManagedBlocker explicitly tells the ForkJoinPool that it is blocked, and the ForkJoinPool starts another thread to run the task in order to maximize CPU utilization.

Look at the following examples and figure it out for yourself.

Public class Fibonacci {public static void main(String[] args) {public static void main(String[] args) { long time = System.currentTimeMillis(); Fibonacci fib = new Fibonacci(); int result = fib.f(1_000).bitCount(); time = System.currentTimeMillis() - time; System.out.println("result "=" + result "); System.out.println("test1_000() time = " + time); } public BigInteger f(int n) { Map<Integer, BigInteger> cache = new ConcurrentHashMap<>(); cache.put(0, BigInteger.ZERO); cache.put(1, BigInteger.ONE); return f(n, cache); } private final BigInteger RESERVED = BigInteger.valueOf(-1000); public BigInteger f(int n, Map<Integer, BigInteger> cache) { BigInteger result = cache.putIfAbsent(n, RESERVED); if (result == null) { int half = (n + 1) / 2; RecursiveTask<BigInteger> f0_task = new RecursiveTask<BigInteger>() { @Override protected BigInteger compute() { return f(half - 1, cache); }}; f0_task.fork(); BigInteger f1 = f(half, cache); BigInteger f0 = f0_task.join(); long time = n > 10_000 ? System.currentTimeMillis() : 0; try { if (n % 2 == 1) { result = f0.multiply(f0).add(f1.multiply(f1)); } else { result = f0.shiftLeft(1).add(f1).multiply(f1); } synchronized (RESERVED) { cache.put(n, result); RESERVED.notifyAll(); } } finally { time = n > 10_000 ? System.currentTimeMillis() - time : 0; if (time > 50) System.out.printf("f(%d) took %d%n", n, time); } } else if (result == RESERVED) { try { ReservedFibonacciBlocker blocker = new ReservedFibonacciBlocker(n, cache); ForkJoinPool.managedBlock(blocker); result = blocker.result; } catch (InterruptedException e) { throw new CancellationException("interrupted"); } } return result; // return f(n - 1).add(f(n - 2)); } private class ReservedFibonacciBlocker implements ForkJoinPool.ManagedBlocker { private BigInteger result; private final int n; private final Map<Integer, BigInteger> cache; public ReservedFibonacciBlocker(int n, Map<Integer, BigInteger> cache) { this.n = n; this.cache = cache; } @Override public boolean block() throws InterruptedException { synchronized (RESERVED) { while (! isReleasable()) { RESERVED.wait(); } } return true; } @Override public boolean isReleasable() { return (result = cache.get(n)) ! = RESERVED; }}}Copy the code

Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.