ForkJoinPool and ThreadPoolExecutor

Before we dive into ForkJoinPool, let’s talk about the difference between a ForkJoinPool and a ThreadPoolExecutor. Why do we use ForkJoinPool? What does ForkJoinPool give us compared to our more common ThreadPoolExecutor? With that in mind, let’s have a good chat.

Similarities and differences between

1. The ForkJoinPool is derived from AbstractExecutorService but is not intended to replace ThreadPoolExecutor. Instead, the ForkJoinPool is intended to complement thread pool usage scenarios and functions

public class ForkJoinPool extends AbstractExecutorService
Copy the code
public class ThreadPoolExecutor extends AbstractExecutorService
Copy the code

2. Different constructors

ThreadPoolExecutor is not the focus of this article, so we won’t go into the constructors in detail. Let’s focus on ForkJoinPool

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
Copy the code
private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix)
Copy the code
  1. Parallelism: The parallelism level, by default, is the same as the number of CPU cores on our machine, and using runtime.geTruntime ().availableProcessors() gives us the number of CPU cores available while our machine is running.
  2. Factory: The factory for creating new threads By default use ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory.
  3. Handler: Thread processor exception conditions (Thread. UncaughtExceptionHandler handler), in the Thread when performing a task for due to some unforeseen errors task Thread is interrupted, the processor will do some processing, the default is null.
  4. AsyncMode: In a ForkJoinPool, each worker thread has a separate task queue. AsyncMode indicates the mode in which the task queues within the worker thread are scheduled, either first-in, first-out FIFO or last-in, first-out LIFO. If true, the worker threads in the thread pool will be scheduled in first-in, first-out (FIFO) mode. The default is false, which means LIFO is the default.
  5. workerNamePrefix: As the name implies, the worker thread name prefix defaults to"ForkJoinPool-" + nextPoolId() + "-worker-"

3. Different working modes ForkJoinPool uses a single work queue for a single thread, instead of a single work queue for multiple threads of a non-ThreadPoolExecutor. That is, the relationship between threads and work queues is changed from many-to-one to one-to-one

The ThreadPoolExecutor thread pool model is as followsThe threads of the ForkJoinPool correspond to the work queue modelForkJoinPool workflows are quite different from ThreadPoolExecutor workflows. I will explain only a few core concepts around Fork/Join:

Work stealing

For ForkJoinPool tasks, there are two types of submission: external/ linked tasks that are submitted directly through the ForkJoinPool and Worker tasks that are internally forked

So these are the two methods

forkJoinPool.submit(forkJoinTask);
forkJoinTask.fork();

 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task;
}
public final ForkJoinTask<V> fork(a) {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
}
Copy the code

There are clearly two ways to join the team

// Internal direct enqueueing, the current thread is bound to the queue
((ForkJoinWorkerThread)t).workQueue.push(this);
// External entry
externalPush(task);
Copy the code
/ / implementation
final void externalPush(ForkJoinTask
        task) {
        WorkQueue[] ws; WorkQueue q; int m;
        int r = ThreadLocalRandom.getProbe();
        int rs = runState;
        if((ws = workQueues) ! =null && (m = (ws.length - 1> =))0&& (q = ws[m & r & SQMASK]) ! =null&& r ! =0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0.1)) { ForkJoinTask<? >[] a;int am, n, s;
            if((a = q.array) ! =null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                int j = ((am & s) << ASHIFT) + ABASE;
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s + 1);
                U.putIntVolatile(q, QLOCK, 0);
                if (n <= 1)
                    signalWork(ws, q);
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1.0);
        }
        externalSubmit(task);
}
Copy the code

It should be noted that ForkJoinPool maintains an array of workQueues, which are all arrays of workQueues. This intent is somewhat similar to the underlying arrays of hashmaps.

WorkQueue q;
int r = ThreadLocalRandom.getProbe();
m = (ws.length - 1)
q = ws[m & r & SQMASK]
Copy the code

So the external entry is into the WorkQueue at the m & r & SQMASK location of the ws[m & r & SQMASK] WorkQueue

For the int r = ThreadLocalRandom. GetProbe (); Just a quick explanation and I’m not going to go into that here

Using ThreadLocalRandom. GetProbe () get probe the hash value of the thread.

In this case, the purpose of the probe hash is to hash the thread, to match the thread to an array element that is not in use, and to avoid thread contention for the same array element. The difference between the probe hash and the map hash is that when a thread is in contention for an array element, the probe hash can be changed to allow the thread to use another array element, whereas the key hash in the map must not be changed due to the need to locate the value.

So back to it, let’s talk about work-stealing.

This algorithm refers to a thread stealing tasks from other queues to execute.

Each worker thread in the ForkJoinPool maintains a WorkQueue, which is a double-ended queue (Deque) containing forkJoinTasks.

When each worker thread generates a new task while running (usually because of a call to fork()), it is placed at the end of the work queue, and the task is retrieved from the end of the queue for execution each time.

Each worker thread in dealing with their own work queue at the same time, will try to steal a task (or from just submit to the task of the pool, or work from other threads queue), the task of stealing from the first team to steal, that is to steal and own queue tasks are separated, so that we can avoid competition. When a join() is encountered, if the task that needs to be joined is not yet complete, the other tasks are processed first and wait for them to complete.

Go to sleep when you have neither your own task nor a task to steal.

ForkJoinTask fork/ Join Is essential if the fork/join framework is to be truly standard.

public abstract class ForkJoinTask<V> implements Future<V>, Serializable
Copy the code

As an abstract class, we need to implement it, but generally we will inherit from its subclass to implement it. The fork/ Join framework provides us with three types of implementations:

  1. RecursiveAction: Can be used for tasks that do not return results.
  2. RecursiveTask: For tasks that have returned results.
  3. CountedCompleter: Fires a custom hook function after the task completes execution.

We can choose the appropriate Task and define its implementation based on our business scenario.

fork & join

What the fork does is push the current task into the work queue of the current worker thread

    public final ForkJoinTask<V> fork(a) {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }
Copy the code
    public final V join(a) {
        int s;
        if((s = doJoin() & DONE_MASK) ! = NORMAL) reportException(s);return getRawResult();
    }
Copy the code

The overall process of join is as follows

How to use

ForkJoinPool and ForkJoinTask can be used by ForkJoinPool and ForkJoinTask. In practice, we can use ForkJoinPool and ForkJoinTask as follows:

  1. ForkJoinPool declaration.
  2. Inherits the ForkJoinTask abstract class or its subclass to implement your business logic in the methods it defines.
  3. The subtask logic is internally forked at appropriate times.
  4. Join summaries are performed internally within the subtask logic when appropriate

conclusion

  • In general, ForkJoinPool is a complement to ThreadPoolExecutor
  • ForkJoinPool provides its own unique thread work queue binding, work separation, and theft methods
  • ForkJoinPool + ForkJoinTask implements the Fork/Join framework
  • For scenarios where tasks can be broken down into smaller subtasks (something like recursion), for computationally intensive tasks that can take full advantage of the CPU’s multi-core capabilities