Abstract: A ForkJoin thread pool splits tasks into subtasks. It is possible that the subtasks are still large and need to be further broken down to produce tasks that are small enough.

This article is from ForkJoin Thread Pool Learning and Thinking by breakDraw.

ForkJoin thread pools are rarely mentioned in regular Java books because of the introduction of Java 8.

ForkJoin begins with a brief explanation of how forkJoin works, which is essentially a bit like merge computing.

1. He forks large submitted tasks into smaller ones according to certain rules

2. Calculations are performed when the task is small enough

3. When the execution is completed, it will join with other small tasks and gradually combine all small results into one big result.

If you want to execute a forkJoin task concurrently, you must inherit a forkJoin RecursiveTask as a Submit object from the forkJoin pool:

Public ForkJoinTask extends RecursiveTask {public ReckonTask {} @override protected File compute() { If (determine if task is small enough based on task parameters) {calculate, return} else {split into subtask 1 and subtask 2 task 1.fork(); Task 2. Fork (); Result 1 = task 1.join(); Result 2 = task 2.join(); Return result 1+ result 2; }}}Copy the code

Now, the whole forkJoin thing is in a lot of detail, and I’ve been asking myself a few questions to understand how Forkjoin works

Q: How does a forkJoin thread acquire small tasks?

A: He got it by working secret. (Java concurrency: workSteal was mentioned in that book)

  • If we set the forkJoin to three worker threads, there will be three work queues. Note that this queue is a double-ended queue.

  • Each time a task is executed, if the conditions for a small task are not met, it forks out two sub-tasks and pushes them into its own work queue.

  • Each worker thread is constantly taking tasks from its own team head.

  • Key point: if there is no data in its own queue, data is fetched from the end of another queue.

Q: What exactly happens at fork?

A: This is an asynchronous operation that adds A fork to the queue of the current thread and returns if it can be put in. Note that by default, tasks are forked to themselves first. When they do not finish, can be taken away by others!

Q: What does join mean? When did you do it?

A: See the code for implementing the forkJoin task interface:

As can be seen, after each fork, join is used to obtain the result of sub-task. After obtaining the result, merge the calculation and return the result.

Q: How does the join blocking process work? If the thread is suspended, the thread will not work.

A: First of all, the new subtask was already queued when it was forked earlier.

Each subtask has a task state. When the join of the subtask is called, its state is determined in a loop

If the subtask status is incomplete, a new task is fetched from its own queue or someone else’s queue to execute, thus entering the exec() operation at the next level. If a subtask status update is found to be completed (either by its own thread or by another thread, since the task’s state is synchronized and visible), the result is returned to the upper layer. Therefore, the essence of join is a recursive process. If the task is not completed, it will continue to perform other tasks recursively.

See the fork+join process for more details

Q: How can forkJoin store tasks without concurrency problems? Like when you’re at the end of the line at the same time

A:

  • N worker threads are stored in an array (i.e. an array of worker threads)

  • The sun.misc.Unsafe operation class, which performs atomic operations at the hardware level directly based on the operating system control layer, is one of the most efficient types of ForkJoinPool, and similar programming ideas are evident in the extensive class functionality implemented in the java.util.Concurrent package.

Q: Where is forkJoin?

A: Java8 Stream’s parallel concurrency is based on forkJoin. ParallelStream implements forkJoin to disassemble tasks and interfaces that execute them. By default, the number of cpus in the machine is forkJoin thread pool. To limit the number of threads, use new forkJoin.submit(()->(list.stream().parallel().map()…) ); Can be

Click to follow, the first time to learn about Huawei cloud fresh technology ~