The Fork Join framework in Java

The Fork Join framework was introduced in Java 7 to improve parallel computing capabilities.

Fork join mainly consists of two steps. The first step is fork, which divides a large task into many small tasks. The second step is join, which joins the results of the first task to generate the final result. If no value is returned in the first step, the Join will wait until all small tasks have finished.

Remember when we talked about the basic structure of thread pools in the previous article?

  1. Executorservice-forkjoinpool is used to execute tasks.
  2. WorkerThread – ForkJoinWorkerThread is a workerThread that performs specific tasks.
  3. Task-forkjointask defines a task to be executed.

Let’s take a closer look at the fork Join framework from these three perspectives.

ForkJoinPool

ForkJoinPool is an implementation of the ExecutorService that provides easy management of worker threads and thread pools.

public class ForkJoinPool extends AbstractExecutorService 
Copy the code

A work thread can only handle one task at a time, but ForkJoinPool does not create a separate thread for each task. Instead, it uses a special data structure, double-ended Queue, to store tasks. Such structures can be used to facilitate work-stealing.

What is work-stealing?

By default, the Work thread fetches tasks from the queue header assigned to it. If the queue is empty, the work Thread fetches tasks from the tail of other queues or from the global queue. Such a design can make full use of the performance of work threads and improve the concurrency capability.

Now let’s see how to create a ForkJoinPool.

The most common method is to use ForkJoinPool.commonPool(). CommonPool () provides a common default thread pool for all ForkJoinTasks.

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
Copy the code

Another way is to use the constructor:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
Copy the code

The argument here is the parallelism level, where 2 means that the thread pool will use 2 processor cores.

ForkJoinWorkerThread

A ForkJoinWorkerThread is a worker thread that is used in a ForkJoinPool.

public class ForkJoinWorkerThread extends Thread
}
Copy the code

Unlike normal threads, it defines two variables:

    final ForkJoinPool pool;                // the pool this thread works in
    final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
Copy the code

One is the ForkJoinPool to which the worker thread belongs. The other is queues that support work-stealing.

Look again at its run method:

   public void run(a) {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart();
                pool.runWorker(workQueue);
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception); }}}}Copy the code

The simplest way to do this is to fetch the task from a Queue.

ForkJoinTask

ForkJoinTask is the type of task that can be run in a ForkJoinPool. Typically, we use two subclasses of RecursiveAction and RecursiveTask.

They both define a Compute () method that needs to be implemented to implement specific business logic. The difference is that a RecursiveAction is only used to execute a task, whereas a RecursiveTask can have a return value.

Since both classes are Recursive, the implementation logic is also Recursive. Here’s an example of using RecursiveAction to print a string:

public class CustomRecursiveAction extends RecursiveAction {

    private String workload = "";
    private static final int THRESHOLD = 4;

    private static Logger logger =
            Logger.getAnonymousLogger();

    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }

    @Override
    protected void compute(a) {
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else{ processing(workload); }}private List<CustomRecursiveAction> createSubtasks(a) {
        List<CustomRecursiveAction> subtasks = new ArrayList<>();

        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());

        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));

        return subtasks;
    }

    private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by "+ Thread.currentThread().getName()); }}Copy the code

The above example uses dichotomy to print strings.

Let’s look at another RecursiveTask example:

public class CustomRecursiveTask extends RecursiveTask<Integer> {
    private int[] arr;

    private static final int THRESHOLD = 20;

    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }

    @Override
    protected Integer compute(a) {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
                    .stream()
                    .mapToInt(ForkJoinTask::join)
                    .sum();
        } else {
            returnprocessing(arr); }}private Collection<CustomRecursiveTask> createSubtasks(a) {
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }

    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
                .filter(a -> a > 10 && a < 27)
                .map(a -> a * 10) .sum(); }}Copy the code

Very similar to the example above, except that we need a return value here.

Submit a Task in ForkJoinPool

With the above two tasks, we can submit them in ForkJoinPool:

int[] intArray= {12.12.13.14.15};
        CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);

        int result = forkJoinPool.invoke(customRecursiveTask);
        System.out.println(result);
Copy the code

In the above example, we use invoke to commit and the Invoke will wait for the result of the task execution.

If we don’t use invoke, we can also replace it with fork () and join () :

customRecursiveTask.fork();
        int result2= customRecursiveTask.join();
        System.out.println(result2);
Copy the code

Fork () submits the task to the pool, but does not trigger execution; join() actually executes and returns the result.

Examples of this article can be found at github.com/ddean2009/l…

See flydean’s blog for more tutorials