preface

A buddy recently asked me a question about a real project that goes something like this: NEED to generate MD5 signatures for paths of all files in a directory. The guy’s first thought was to take a recursive approach, which is fine, but there’s no need to build your own wheels. Java already provides a mature tool to use, which is the Fork/Join parallel execution framework. Based on this opportunity, this article will document my understanding of Fork/Join framework principles.

In JDK1.7, the J.U.C package welcomed the new member Fork/Join parallel execution task framework. The idea of the Fork/Join framework is really the idea of divide and conquer.

What is the Fork/Join framework?

We understand it by the words Fork and Join. Fork is to divide a large task into several sub-tasks and execute these sub-tasks in parallel. It can be seen that the sub-tasks are independent tasks. They do not affect each other but are all part of the larger task. A Join is a summary of the results of the subtask. And you end up with the result of the big task. Here is a flowchart for a Fork/Join operation:

Design idea of Fork/Join framework

Based on the above, if you were to implement a framework with similar functionality, how would you design it? With the help of this problem, we explore the design idea of Fork/Join framework. Split tasks: First, we need a fork class to split the large task into subtasks. However, subtasks can still be large, so we continue to split the subtasks until the task granularity is small enough. Merging results of executing tasks: the segmented tasks are respectively put into the dual-end queue, and then several threads are started to obtain the tasks from the dual-end queue to execute the tasks. The execution results are put into another queue, and one thread merges the execution results. In the Fork/Join framework, two classes are provided to accomplish the above two steps.

  • ForkJoinTask To use the Fork/Join framework, a ForkJoin task must be created. ForkJoinTask provides this capability and provides a mechanism for both fork() and join(). Normally, we do not need to inherit ForkJoinTask, but subclasses of it to fulfill our actual needs. It has two subclasses distinguished as follows:
The name of the class instructions
RecursiveAction For tasks that do not return results
RecursiveTask Used for tasks that return results
  • ForkJoinPool ForkJoinTask can be executed by ForkJoinPool. The split subtasks are added to the two-ended queue maintained by the current worker thread, and when there is no work temporarily in the queue of one worker thread, the task is stolen from the tail of the queue of another worker thread to execute.

Use the Fork/Joink framework

Next, we will use a Fork/Join framework to calculate 1+2+3+… + 100.

public class ForkJoinTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // Create a ForkJoinPool that specifies the number of concurrent threads
        ForkJoinPool forkJoinPool = new ForkJoinPool(2);
        // Create the target task
        MyAddTask myAddTask = new MyAddTask(1.100.20);
        // Submit the target task to the ForkJoinPool
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myAddTask);
        // Get the execution result of the task
        Integer result = forkJoinTask.get();
        System.out.println("Calculation result:"+ result); }}class MyAddTask extends RecursiveTask<Integer> {

    / / threshold
    private int threshold;
    private int start;
    private int end;

    public MyAddTask(int start, int end, int threshold) {
        this.start = start;
        this.end = end;
        this.threshold = threshold;
    }

    @Override
    protected Integer compute(a) {
        int sum = 0;
        // If the granularity is small enough, calculate directly
        if (end - start <= threshold) {
            for (inti = start; i <= end; i++) { sum += i; }}else {
            // Otherwise continue to split tasks
            int middle = (start + end) / 2;
            MyAddTask left = new MyAddTask(start, middle, threshold);
            MyAddTask right = new MyAddTask(middle + 1, end, threshold);
            // Split subtasks
            ForkJoinTask<Integer> leftTask = left.fork();
            ForkJoinTask<Integer> rightTask = right.fork();
            // Execute the subtask to calculate the result
            Integer leftResult = leftTask.join();
            Integer rightResult = rightTask.join();
            sum = leftResult + rightResult;
            System.out.println("Subtask merge result: sum=" + sum + " start=" + start + " end=" + end);
        }
        returnsum; }}Copy the code

The result is as follows:

Fork/Join framework

ForkJoinPool

In the above example it is through New ForkJoinPool(); However, this is not the approach recommended by author Doug Lea. In the ForkJoinPool main class annotation, there is a sentence like this:

A static commonPool() is available and appropriate for most applications. The common pool is used by any ForkJoinTask that is not explicitly submitted to a specified pool. Using the common pool normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use).

The ForkJoinPools class has a static method, commonPool(). The ForkJoinPools instances obtained by this static method are shared by the entire application process and are suitable for most application scenarios. Using commonPool usually helps multiple tasks in an application that need to merge computing to share computing resources so that the latter can be used to maximum effect (Worker threads in ForkJoinPools are slowly recycled when idle and then restored when needed later). This is the way Doug Lea recommends using ForkJoinPools instances. The code is as follows:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
Copy the code

ForkJoinPool submits tasks

The method name instructions
invoke(ForkJoinTask t) Commit the task and block until the task completes and returns the merge result.
execute(ForkJoinTask t) The task is executed asynchronously with no return value.
submit(ForkJoinTask t) Asynchronously execute the task and return the task itself, using the task.get() method to get the merged result.

ForkJoinTask

FOrkJoinTask provides two main methods, fork() and join(), to split subtasks and combine the results of subtask calculations. The source code for these two methods seems difficult because there are a lot of arithmetic, bitwise, and logical operations inside. But if carefully combed, the overall context is very easy to grasp. The main purpose of the fork() method is to add the fork subtasks to the task queue. The primary purpose of the join() method is to merge the results of subtask execution.

Worker threads and work queues

ForkJoinPool can handle Runnable and Callable tasks in addition to ForkJoinTasks and their subtasks. First, we look at execution under ForkJoinTask. A ForkJoinPool worker thread uses a ForkJoinWorkerThread. It inherits the Thread class. Internally, there are two very critical variables:

    final ForkJoinPool pool;                // the pool this thread works in
    final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
Copy the code

Pool indicates the ForkJoinPool instance to which the worker thread belongs. WorkQueue represents the queue of subtasks to be executed corresponding to this worker thread. WorkQueue is an internal class of ForkJoinPool that does not inherit BlockingQueue from Other Java queues to provide methods such as offer/poll for joining and leaving a queue. Instead, it is implemented independently based on a double-entailed form, providing push()/pop() methods to operate, and the elements in the queue are essentially subtasks that are split. It supports task theft, which must be thread-safe, so the UnSafe base class was heavily leveraged internally to ensure thread-safety.

Work stealing

To reduce contention between threads, the Fork/Join framework places subtasks on different queues and creates a separate thread for each queue to execute the tasks in the queue. Threads and queues correspond one to one. For example, thread A is responsible for executing tasks in queue 1 and thread B is responsible for executing tasks in queue 2. Some threads always finish ahead of time, and in order to improve the overall efficiency of execution, these threads will “steal” tasks from other queues to execute them. This is the “work stealing” feature in the Fork/Join framework.

Exception handling

ForkJoinTask may throw an exception during execution, but the main thread is not aware of it, so there is no way to catch an exception in the main thread. ForkJoinTask provides the isCompletedAbnormally() method to determine if it has thrown an exception. If the Throwable object is determined to be thrown, the Throwable object can be retrieved using the getException() method. CancellationException is returned if the task is cancelled. Returns NULL if the task did not complete or no exception was thrown.

conclusion

This article mainly introduces the concept and use of Fork/Join framework and the principle of implementation, source code without careful analysis, but the idea is actually the source code of cash. This divide-and-conquer approach to Fork/Join is similar to MapReduce in big data. The Fork/Join framework is standalone.