What is the Fork/Join framework

The Fork/Join framework is a thread pool introduced in JDK 7 for parallel execution. It is used to break a large task into several small tasks that are executed in parallel, and ultimately summarize the results of each small task to obtain the results of the large task. It is also easy to see from its name that the framework is mainly divided into Fork and Join stages. In the first stage, Fork is to split a large task into multiple sub-tasks for parallel execution, while in the second stage, Join is to merge all the execution results of these sub-tasks and finally get the result of the large task.

Here it is not difficult to find the main process of execution: first judge whether a task is small enough, if the task is small enough, calculate directly, otherwise, split into several smaller tasks to calculate separately, this process can be repeatedly broken into a series of small tasks. Fork/Join framework is an algorithm based on divide and conquer. By splitting large tasks into multiple independent small tasks, these small tasks are executed in parallel, and the results of small tasks are merged to get the final result of large tasks, and the efficiency is improved through parallel computing.

Examples of using the Fork/Join framework

To see how the Fork/Join framework works, let’s look at an example of calculating the sum of all the elements in a list. The general idea is to divide the list into sublists and sum the elements of each sublist. Then, we sum all these values to get the sum of the original list. The Fork/Join framework defines a ForkJoinTask to represent a Fork/Join task. It provides Fork (), Join (), and other operations. In general, we do not need to inherit the ForkJoinTask class directly. Instead, use the two ForkJoinTask subclasses provided by the framework:

  • RecursiveActionUsed to representNo result returnedtheFork/JoinTask.
  • RecursiveTaskUsed to representThere is a returntheFork/JoinTask.

Obviously, we need to return results in this example, so we can define a SumAction class that inherits from RecursiveTask as follows:

/ * * *@author mghio
 * @sinceThe 2021-07-25 * /
public class SumTask extends RecursiveTask<Long> {

  private static final int SEQUENTIAL_THRESHOLD = 50;

  private final List<Long> data;

  public SumTask(List<Long> data) {
    this.data = data;
  }

  @Override
  protected Long compute(a) {
    if (data.size() <= SEQUENTIAL_THRESHOLD) {
      long sum = computeSumDirectly();
      System.out.format("Sum of %s: %d\n", data.toString(), sum);
      return sum;
    } else {
      int mid = data.size() / 2;
      SumTask firstSubtask = new SumTask(data.subList(0, mid));
      SumTask secondSubtask = new SumTask(data.subList(mid, data.size()));
      // Execute the subtask
      firstSubtask.fork();
      secondSubtask.fork();
      // Wait for the subtask to complete and get the result
      long firstSubTaskResult = firstSubtask.join();
      long secondSubTaskResult = secondSubtask.join();
      returnfirstSubTaskResult + secondSubTaskResult; }}private long computeSumDirectly(a) {
    long sum = 0;
    for (Long l : data) {
      sum += l;
    }
    return sum;
  }

  public static void main(String[] args) {
    Random random = new Random();

    List<Long> data = random
        .longs(1 _000.1.100)
        .boxed()
        .collect(Collectors.toList());

    ForkJoinPool pool = new ForkJoinPool();
    SumTask task = new SumTask(data);
    pool.invoke(task);

    System.out.println("Sum: "+ pool.invoke(task)); }}Copy the code

Here, when the size of the list is smaller than the value of the SEQUENTIAL_THRESHOLD variable (threshold), it is regarded as a small task and the result of summation of the list elements is calculated directly. Otherwise, it is divided into small tasks again and the running result is as follows:

The main differences between ForkJoinTask tasks in the Fork/Join framework and ordinary ordinary tasks are as follows: ForkJoinTask needs to implement the abstract method compute() to define the computation logic. The general template for this method is to determine whether the current task is a small task, and if so, the task is executed. If not, the task is divided into two subtasks. Then, when each subtask calls the fork() method, it enters the compute() method again to check if the current task needs to be split into subtasks. If it is already a small task, perform the current task and return the result. Otherwise, continue to split. Finally, the join() method is called to wait for all subtasks to complete and get the results. The pseudocode is as follows:

if (problem is small) {
  directly solve problem.
} else {
  Step 1. split problem into independent parts.
  Step 2. fork new subtasks to solve each part.
  Step 3. join all subtasks.
  Step 4. compose result from subresults.
}
Copy the code

Design of Fork/Join framework

How would you design a Fork/Join framework that breaks down a large task into smaller tasks and then sums up the results of each small task to produce the results of the larger task? The entire process of the Fork/Join framework is, as the name suggests, divided into two steps:

  1. Large task splitting requires a class to split a large task into subtasks. Maybe the subtask is still large after one split, and it needs to be split several times until the subtask meets the small task defined by us.
  2. Execute tasks and merge task results The subtasks that are split in the first step are stored in a two-end queue (P.S. See below for why dual-endian queues are used), and then each queue starts a thread to fetch the task from the queue for execution. The results of these subtasks are placed in a uniform queue, and then a thread is started to retrieve the data from the queue, and finally the data is merged back.

The Fork/Join framework uses the following two classes to accomplish the above two steps:

  • ForkJoinTask classIt is also mentioned in the example above, indicating thatForkJoinTasks, which must first be defined when using a framework, usually only need to be inherited fromForkJoinTaskA subclass of a classRecursiveAction(no result returned) orRecursiveTask(return the result) can be.
  • ForkJoinPoolAs you might guess from the name, it’s for executionForkJoinTaskOf the thread pool. Subtasks that are separated from the larger task will be added to the current threaddequeIn the head.

Like thinking of you, in the heart will think of such a scenario, when we need to complete a task, will be the first big task split into multiple independent subtasks, these subtasks in independent queue, and create a separate thread for each queue to perform tasks in the queue, the thread and queue one-to-one relationship here, It’s a good problem that some threads may finish their queue and others may not, leaving some threads waiting.

Concurrent now that do, be sure to maximum squeezing the computer performance, for this kind of scenario concurrent master Doug Lea work stealing algorithm processing, using work stealing algorithm, to finish their task queue queue of threads to other threads “steal” to perform a task, ha ha, one party hard, p plus support. ForkJoin uses a double-ended queue structure to reduce competition between the thread that stole the task and the thread that stole the task. In this case, tasks can be executed according to this rule: The thread that stole the task always gets the task from the head of the queue and executes, and the thread that stole the task executes using the task from the end of the queue. This algorithm can make full use of multiple threads for parallel computation in most cases, but there is a certain degree of competition in extreme cases such as only one task in a double-ended queue.

Implementation principle of Fork/Join framework

At the heart of the implementation of the Fork/Join framework is the ForkJoinPool class, which consists of an array of ForkJoinTasks and an array of ForkJoinWorkerThreads. The ForkJoinTask array is used to hold the tasks submitted by the framework user to the ForkJoinPool, and the ForkJoinWorkerThread array is used to perform these tasks. Tasks have the following four states:

  • NORMAL has been completed
  • CANCELLED was CANCELLED
  • SIGNAL the SIGNAL
  • EXCEPTIONAL exception

ForkJoinTask fork() ¶ The fork() method of the ForkJoinTask has the following code:

The ForkJoinWorkerThread workQueue push() method is called asynchronously for a thread of type ForkJoinWorkerThread and returns the result immediately. ForkJoinPool push() method

Method to add the current task to the ForkJoinTask task queue array, and then call the signalWork method of the ForkJoinPool to create or wake up a worker thread to execute the task. ForkJoinTask join() ¶

The doJoin() method first calls the doJoin() method, which returns the status of the current task and does different things depending on the returned task status:

  1. Completed status returns the result directly
  2. The cancelled state throws an exception (CancellationException)
  3. If an exception occurs, the corresponding exception is directly thrown

Follow up the doJoin() method, the source code is as follows:

Method First determines whether the current task is completed, and then returns the task status directly. If the execution is not complete, the task is fetched from the task array (workQueue) and executed. When the task is complete, the state of the task is set to NORMAL. If an exception occurs, the exception is recorded and the state of the task is set to EXCEPTIONAL (in the doExec() method).

conclusion

This article mainly introduces the basic principles of Fork/Join framework in Java concurrency framework and its use of work-stealing algorithms, design methods and part of the implementation source code. The Fork/Join framework also works in the JDK’s official standard library. For example, the parallelSort(Array) provided by JDK 1.8+ standard library can be used for parallel sorting. Its principle is that the parallel sorting of large components can be carried out through the Fork/Join framework internally, which can improve the sorting speed. . There is a Collection of a set of parallelStream () method is based on the Fork/Join framework bottom, the last is to define the small task of threshold value is often need to pass the test validation can give reasonable, and ensure that the program can achieve the best performance.