1. Introduction

In my previous article, “Ten Times Programmer” Uses Parallel Streams ten times Faster, I briefly covered the use of parallelStream in Java, which in most business scenarios can perform streaming operations in parallel by calling the cores of more processors, with extremely simple API calls.

Parallel flows in Java use the Fork/Join framework and its common worker thread pool.

Today we’ll explore the implementation and the knowledge of the Fork/Join framework.

2. The Fork/Join framework

The Fork/Join framework first appeared in Java 7. It provides tools to help speed up parallel processing by trying to use all available processor cores — this is done through a divide-and-conquer approach.

The idea of divide and rule is realized in two steps in practice:

  1. Forks — Recursively breaking tasks into smaller, independent sub-tasks until they are simple enough to be executed asynchronously.
  2. Post-join (that is, join) – The results of all subtasks are recursively joined into a single result, or in the case of a task that returns void, the program simply waits until each subtask is executed.

2.1 split source

The Fork/Join framework is responsible for splitting the source data between worker threads and handling callbacks when the task completes.

Let’s take a look at an example of computing the sum of integers in parallel.

public class ParallelStreamDemo {
    public static void main(String[] args) {
        List<Integer> listOfNumbers = Arrays.asList(1.2.3.4);
        / / the serial stream
        int sum1 = listOfNumbers.stream().reduce(5, Integer::sum);
        / / parallel flows
        int sum2 = listOfNumbers.parallelStream().reduce(5, Integer::sum);
        System.out.println("Serial stream execution result:" + sum1);
        System.out.println("Parallel stream execution result:"+ sum2); }}Copy the code

Execution Result:

The serial stream performs as expected, so why is the parallel stream performing twice as well as expected?

This is because the Reduce operation is processed in parallel, so the number 5 actually adds up in each worker thread:

The actual results may vary depending on the number of threads used in the common Fork/Join thread pool.

To solve this problem, the number 5 should be added to the parallel stream:

int sum3 = listOfNumbers.parallelStream().reduce(0, Integer::sum) + 5;
Copy the code

When using parallel flows, you must be careful which operations can be run in parallel.

To provide efficient parallel execution, the Fork/Join framework uses a thread pool called ForkJoinPool that manages worker threads of type ForkJoinWorkerThread.

3. ForkJoinPool

ForkJoinPool is the heart of the Fork/Join framework. It is an implementation of ExecutorService that manages worker threads and gives us tools to obtain information about thread pool status and performance.

A worker thread can only perform one task at a time, but ForkJoinPool does not create a separate thread for each subtask. Instead, the ForkJoinPool stores tasks in a double-ended queue (deque) owned by each thread in the pool.

This thread pool design is critical to balancing thread workloads with the help of a work-stealing Algorithm.

3.1 the Work – Stealing Algorithm

The work stealing algorithm simply says: idle threads try to “steal” work from a two-ended queue of busy threads.

By default, a worker thread retrieves a task from its own deque’s header task list. When the list of tasks in the worker thread’s own deQUE is empty, the thread retrieves a task from the tail of another busy worker thread’s deQUE or from the global entry queue.

This algorithm can minimize the possibility of threads competing for tasks. It also reduces the number of times a thread has to go looking for task work, because it deals with the largest chunk of work available first.

3.2 Instantiation of ForkJoinPool

In Java 8, the easiest way to access a ForkJoinPool instance is to use its static method, commonPool(). As the name implies, it provides a reference to the common thread pool, which is the default thread pool for every ForkJoinTask.

Using a pre-defined common thread pool reduces resource consumption because we no longer need to create a separate thread pool for each task.

ForkJoinPool commonPool = ForkJoinPool.commonPool();
Copy the code

From the source code, we can see that the creation of the public pool is instantiated in the static constructor of the ForkJoinPool class.

Returns the static variable common of a ForkJoinPool class

In the ForkJoinPool static{} static method, makeCommonPool() is called

The final instantiation is through the ForkJoinPool constructor.

In Java 7, we can create forkJoinPools by assigning a public static field to a utility class.

// Create a thread pool that uses 2 processor cores, meaning 2 threads working in parallel
public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);
Copy the code

Take a look at the default ForkJoinPool constructor from the source code. It contains four method parameters: parallelism, thread factory, exception handler, and asynchronous mode (FIFO/LIFO).

Now we can easily access the thread pool:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;
Copy the code

4. ForkJoinTask

4.1 Explanation of source code

ForkJoinTask is the basic type of task performed by a ForkJoinPool.

ForkJoinTask is an abstract class that has two abstract word classes: a RecursiveAction with no return value and a RecursiveTask

with a return value. They all have an abstract method compute() that defines the execution logic of the task.

RecursiveAction abstract class

RecursiveTask

4.2 the customRecursiveActionCode demo

public class CustomRecursiveAction extends RecursiveAction {

    private String workload;

    private static final int THRESHOLD = 4;

    public String getWorkload(a) {
        return workload;
    }

    public void setWorkload(String workload) {
        this.workload = workload;
    }

    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }

    /** * To demonstrate ForkJoin's forking behavior, use createSubTask() to split tasks when the workload of a string variable is greater than the specified THRESHOLD
    @Override
    protected void compute(a) {
        if (workload.length() > THRESHOLD) {
            // Submit the task list to ForkJoinTask
            ForkJoinTask.invokeAll(createSubTask());
        } else{ print(workload); }}/** * Recursively creates a subtask that is submitted to ForkJoinTask by the ForkJoinTask calling overridden compute() *@return* /
    private List<CustomRecursiveAction> createSubTask(a) {
        List<CustomRecursiveAction> subTasks = new ArrayList<>();

        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2);

        subTasks.add(new CustomRecursiveAction(partOne));
        subTasks.add(new CustomRecursiveAction(partTwo));

        return subTasks;
    }

    /** * Prints the ForkJoinTask execution result *@param work
     */
    private void print(String work) {
        System.out.println("This result - (" + work + ") - was processed by "
                + Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        // Workload initialization takes only 4 characters. No branching logic is performed
        CustomRecursiveAction action = new CustomRecursiveAction("abcd");
        action.compute();

        // The second execution uses 5 characters to perform the bifurcation logic
        action.setWorkload("abcde"); action.compute(); }}Copy the code

Executing the main() method results in the following:

As expected, the first execution of compute() does not fork, and the second execution, the workload variable forks.

This pattern can be used to develop your own RecursiveAction class. This class needs to: create an object that represents the amount of work, select appropriate thresholds, define ways to divide the work, and define ways to complete the work.

4.3 the customRecursiveTask<V>Code demo

public class CustomRecursiveTask extends RecursiveTask<String> {

    private String workload;

    private static final int THRESHOLD = 4;

    public String getWorkload(a) {
        return workload;
    }

    public void setWorkload(String workload) {
        this.workload = workload;
    }

    public CustomRecursiveTask(String workload) {
        this.workload = workload;
    }

    /** * Demonstrates the ForkJoin framework's forking behavior. When the workload of a string variable exceeds a specified THRESHOLD, * splits tasks using createSubTask(), which triggers execution by calling join(). * Summarize the results of subtask execution using the Java Stream Api */
    @Override
    protected String compute(a) {
        String result;
        if (workload.length() > THRESHOLD) {
            The ForkJoinTask submits the list of tasks to the public pool. The invokeAll() method submits the subtasks to the public pool and returns a Future list. Join () is invoked to trigger the execution of the join operation
            result = ForkJoinTask.invokeAll(createSubTask()).stream().map(ForkJoinTask::join).collect(Collectors.joining(""));
        } else {
            result = process(workload);
        }
        System.out.println(result);
        return result;
    }

    /** * Recursively creates a subtask that is submitted to ForkJoinTask by the ForkJoinTask calling overridden compute() **@return* /
    private List<CustomRecursiveTask> createSubTask(a) {
        List<CustomRecursiveTask> subTasks = new ArrayList<>();

        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2);

        subTasks.add(new CustomRecursiveTask(partOne));
        subTasks.add(new CustomRecursiveTask(partTwo));

        return subTasks;
    }

    /** * ForkJoinTask execution result **@param work
     */
    private String process(String work) {
        return work;
    }

    public static void main(String[] args) {
        // Workload initialization takes only 4 characters. No branching logic is performed
        CustomRecursiveTask action = new CustomRecursiveTask("abcd");
        action.compute();

        // The second execution uses 5 characters to perform the bifurcation logic
        action.setWorkload("abcde"); action.compute(); }}Copy the code

Executing the main() method results in the following:

As expected, the first execution of compute() does not fork, and the second execution, the workload variable forks, and returns the result of the execution and the result after join(), respectively.

5. Submit a task to ForkJoinPool

You can use the submit() and execute() methods to submit tasks to the thread pool.

public class ForkJoinDemo {
    public static void main(String[] args) {
        // Instantiate ForkJoinPool
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        // Instantiate a custom RecursiveTask that returns a value
        CustomRecursiveTask customRecursiveTask = new CustomRecursiveTask("abcde");
        // Call ForkJoinPool's execute method to submit the task to the thread pool
        forkJoinPool.execute(customRecursiveTask);
        // Call ForkJoinTask's join() method to trigger the join operation and get the return value
        String result = customRecursiveTask.join();
        System.out.println("CustomRecursiveTask result = " + result);

        Instantiate a custom RecursiveAction with no return value
        CustomRecursiveAction customRecursiveAction = new CustomRecursiveAction("abcde");
        // Call ForkJoinPool submit to submit the task to the thread pool
        forkJoinPool.submit(customRecursiveAction);
        ForkJoinTask calls Join () to trigger the join operation with no return valuecustomRecursiveAction.join(); }}Copy the code

You can also use the invoke() method to fork a task and wait for the result of execution, without manually calling the Join () method to join.

// Instantiate a custom RecursiveTask that returns a value
CustomRecursiveTask customRecursiveTask1 = new CustomRecursiveTask("L La Ramie.");
// Call ForkJoinPool invoke to submit the task to the thread pool and execute it automatically
String result1 = forkJoinPool.invoke(customRecursiveTask1);
System.out.println("CustomRecursiveTask1 result = " + result1);
Copy the code

The invokeAll() method is the most convenient way to submit a ForkJoinTasks collection to a ForkJoinPool. It takes the task set as an argument, and fork returns a collection of Future objects in the order they were generated.

You can also use fork() and join() methods separately. The fork() method submits a task to the thread pool, but it cannot trigger the execution of the join operation. The join() method is used to trigger the join operation. The join() method for RecursiveAction returns no value; The join() method of RecursiveTask returns the result of the task execution.

Instantiate a custom RecursiveAction with no return value
CustomRecursiveAction customRecursiveAction1 = new CustomRecursiveAction("L La Ramie.");
// Call ForkJoinTask to submit the task to the thread pool
customRecursiveAction1.fork();
ForkJoinTask calls Join () to trigger the join operation with no return value
customRecursiveAction1.join();
Copy the code

It is best to submit multiple tasks to ForkJoinPool using the invokeAll() method.

6. Summary

Using the Fork/Join framework can speed up processing of large tasks, but to achieve this result, follow some guidelines:

  • Use as few thread pools as possible, and in most cases, the best decision is to use one thread pool per application or system
  • Use the default common thread pool unless you need special tuning
  • A ForkJoinTask is broken into subtasks using a reasonable threshold
  • Avoid any blocking in ForkJoinTasks

Demo source on GitHub

It is not easy to create, thank you for your easy like 👏👏

7. Good recommendations

⭐ “2021 Year-end Summary” 10 years deep Floating, 3 cars, 3 apartments ⭐

⭐ “Super Architect” DevOps Beginner’s Guide ⭐

⭐ “Super Architect” 1.5W tutorial on distributed systems ⭐

⭐ “Super Architect” on those design patterns in microservices ⭐

⭐ “Super Architect” is probably the best Redis distributed lock implementation ⭐