This is the seventh day of my participation in the First Challenge 2022. For details: First Challenge 2022.

In JDK 1.7, the Fork/Join parallel execution task framework is provided. Its main function is to divide large tasks into several small tasks, and then summarize the results of each small task. This development method is also called divide-and-conquer programming, which can greatly utilize CPU resources and improve task execution efficiency.

Fork/Join divide and conquer programming

The parallel execution framework fork-Join in the JDK uses a “work-stealing” algorithm, in which a thread steals tasks from other queues to execute.

To complete a larger task, for example, if can put the big task is divided into thousands of mutually dependent child tasks/small tasks, in order to more convenient to manage these tasks, then put these subtasks respectively in different queue, will handle at this moment, to complete the task of threads rather than waiting for, not to help the other threads share to perform a task, It will then steal tasks from other threads’ queues to execute them, in what is called a work-stealing algorithm.

Work stealing

A major difference between ForkJoinPool and ThreadPoolExecutor is that ForkJoinPool has introduced a work-stealing design, which is one of the keys to its performance guarantee. Job stealing allows idle threads to steal tasks from busy threads in a two-ended queue. By default, the worker thread gets the task from the head of its own two-ended queue. However, when its own task is empty, the thread retrieves the task from the end of the double-ended queue of other busy threads. This approach minimizes the possibility of threads competing for tasks.

Most of ForkJoinPool operations take place in work-stealingqueues, which are implemented by the internal WorkQueue class. It is a special form of Deques, but supports only three modes of operation: push, POP, and poll (also known as steal). In ForkJoinPool, queue reading is strictly restricted. Push and POP can only be called from the thread in which they belong, while poll can be called from other threads.

The operation process of job theft is shown in the figure below:

  • The advantage of job stealing algorithm is that it makes full use of threads for parallel computation and reduces the competition between threads.
  • The disadvantage of the job-stealing algorithm is that it is still competitive in some cases, such as when there is only one task in a two-end queue. It also consumes more system resources, such as creating multiple threads and multiple double-ended queues.

Why does the worker thread get from the head of the queue and the work steal from the tail?

The main reason for doing this is to improve performance, and by always selecting the most recently committed task, you increase the chances that the resource will still be allocated in the CPU cache, so that the CPU can process it faster. The reason why the stealer gets the task from the tail is to reduce the possibility of competition between threads, after all, everyone gets the task from the same part, the possibility of competition is much larger. In addition, there is a consideration to such a design. Since tasks are divisible, the older tasks in the queue are most likely to be more granular because they may not have been partitioned yet, and the idle threads have relatively more “energy” to complete these larger-grained tasks.

Divide and conquer algorithm

The basic idea of divide-and-conquer algorithm is to decompose a scale N problem into K smaller sub-problems, which are independent from each other and have the same properties as the original problem. The solution of the original problem can be obtained by solving the subproblem. It is a divided objective completion program algorithm, simple problems can be solved by dichotomy.

General steps for divide-and-conquer solution:

(1) Decomposition, the problem to be solved is divided into a number of smaller similar problems;

(2) Solution. When the subproblem is divided into small enough, a simpler method is used to solve it;

(3) merge, according to the requirements of the original problem, merge the solutions of sub-problems layer by layer to form the solution of the original problem.

Examples and code can be found in the following ForkJoinPoolTest for cumulative summing, with the beginning of the code for each step in the comments.

ForkJoinPool and ForkJoinTask

ForkJoinPool

ForkJoinPool is a pool that executes ForJoinTask tasks. It is distinct from a thread pool consisting of Worker and Queue. Instead, a queue array WorkQuque(WorkQuque[]) is maintained to significantly reduce collisions when submitting tasks and threading tasks.

The constructor code is as follows:

public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}


private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
Copy the code

Meanings of parameters:

  • Parallelism Specifies the parallelism level. ForkJoin determines the number of worker threads based on this setting. Will be used if not setMath.min(MAX_CAP, Runtime.getRuntime().availableProcessors())This is actually the number of CPU core threads.
  • A Factory ForkJoinPool creates a thread using the Factory. Customization is requiredForkJoinWorkerThreadFactoryInterface, used by defaultDefaultForkJoinWorkerThreadFactory
  • Handler Specifies the exception handler. When a task fails during execution, the specified handler will handle it
  • Mode: sets the queue working mode: FIFO_QUEUE and LIFO_QUEUE
  • WorkerNamePrefix Prefix of the thread name

ForkJoinTask

ForkJoinTask is one of the cores of ForkJoinPool. It is the physical carrier of tasks, defining the specific logic of execution time and the logic of splitting.

ForkJoinTask inherits the Future interface and can also be considered a lightweight Future.

ForkJoinTask is an abstract class with a number of methods. At its core, the fork() and join methods are used to coordinate tasks, one for submitting tasks and the other for retrieving results

  • Fork () Submits the task: The fork() method is used to submit the task to the thread pool where the current task is running. If the current thread is of type ForkJoinWorkerThread, it is put into the work queue of that thread, otherwise it is put into the work queue of the common thread pool.
  • Join () gets the result of the task: The join() method is used to get the execution result of the task. When join() is called, the current thread blocks until the corresponding subtask finishes running and returns the result.

Normally, we do not need to inherit ForkJoinTask directly, but only from subclasses of ForkJoinTask. The Fork/Join framework provides three subclasses of ForkJoinTask:

  • RecursiveAction is used for tasks that execute recursively and do not need to return results
  • RecursiveTask A task that executes recursively and returns a result
  • CountedCompleter: A custom hook is triggered after the completion of a task

ForkJoin is best suited for purely computational tasks, that is, purely functional calculations, which run independently of each other with no external data/logical dependencies. Submitted ForkJoinPool tasks should avoid performing blocking I/O.

Implementation example

Implement a RecursiveTask (int a -> b)

Public class ForkJoinPoolTest {static class MyForkJoinTask extends RecursiveTask<Integer> {// The number of tasks per task private static final Integer MAX = 200; Private Integer startValue; Private Integer endValue; public MyForkJoinTask(Integer startValue, Integer endValue) { this.startValue = startValue; this.endValue = endValue; } @override protected Integer compute() {// (2) {Override protected Integer compute() {} @override protected Integer compute() { if (endValue - startValue < MAX) { System.out.println(Thread.currentThread().getName()+": StartValue = "+ startValue + "; endValue = " + endValue); Integer totalValue = 0; for (int index = this.startValue; index <= this.endValue; index++) { totalValue += index; } return totalValue; } // (1) decomposition, the problem to be solved into a number of smaller similar problems; else { MyForkJoinTask subTask1 = new MyForkJoinTask(startValue, (startValue + endValue) / 2); subTask1.fork(); MyForkJoinTask subTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1, endValue); subTask2.fork(); // (3) merge, according to the requirements of the original problem, merge the solutions of the sub-problem layer by layer to form the solution of the original problem. return subTask1.join() + subTask2.join(); } } } public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); // 0-1000 ForkJoinTask<Integer> task = ForkJoinPool. submit(new MyJoinTask (0, 1000)); Integer result = task.get(); // Prints system.out.println (result); }}Copy the code

The following output is displayed:

Forkjoinpool-1-worker-3: Part of the forkJoinpool-1-worker-3 that starts the calculation: startValue = 501; EndValue = 625 ForkJoinPool-1 worker-2: The portion that starts the calculation: startValue = 0; EndValue = 125 forkJoinpool-1-worker-0: startValue = 751; EndValue = 875 ForkJoinPool-1 worker-3: The portion that starts the calculation: startValue = 626; EndValue = 750 ForkJoinPool-1-worker-0: startValue = 876; EndValue = 1000 forkJoinPool-1 worker-2: startValue = 126; EndValue = 250 ForkJoinPool-1-worker-0: startValue = 251; EndValue = 375 forkJoinPool-1 worker-2: startValue = 376; endValue = 500 500500Copy the code

Conclusion: As you can see from the thread name, I started four threads using all the default parameters. We get about 125 calculations each time

The resources

  • baike.baidu.com/item/ divide-and-conquer /3…
  • Blog.csdn.net/tyrroo/arti…
  • www.cnblogs.com/vacation/p/…