1 introduction

Java7 provides ForkJoinPool support for splitting a task into multiple “small tasks” for parallel computation and combining the results of multiple “small tasks” into a total result.

ForkJoinPool is not intended as a replacement for ExecutorService, but rather as a complement to it, providing better performance than ExecutorService in certain application scenarios. Using the idea of divide and conquer + work steal algorithm, a thread pool; Best suited for computation-intensive tasks, use ManagedBlocker in conjunction with I/O, inter-thread synchronization, sleep(), etc., which cause a thread to block for a long time.

The following will be introduced from the following aspects

  1. use
  2. The data structure
  3. Flow and Logic

2 use

The following steps are required to use it

  1. Define the task

    • Common tasks: Runnable interface, Callable interface and other implementation classes
    • ForkJoinTask subclasses include CountedCompleter, RecursiveAction, and RecursiveTask. Users can select the above classes to achieve their own
  2. Submit a task

    • ForkJoinPool must be used to submit common tasks
    • ForkJoinTask Is a type of task that can be forked
  3. To get the results

    • ForkJoinTask Handle to a task that is processed by the Join method
    • The thread pool invoke method is committed and executed

Normal tasks are not shown here, and are no different from ThreadPoolExecutor; Here is an example of a ForkJoinTask

2.1 Defining Tasks

class Task(private val num : Int) : RecursiveTask<Long>() {
    override fun compute(): Long {
        if (num < 2) return 1L
        val t1 = Task(num - 1)
        val t2 = Task(num - 2)
        t1.fork()
        t2.fork()
        return t1.join() + t2.join()
    }
}
Copy the code

A RecursiveTask is a task that computes a result. A RecursiveAction is a task that computes no result. CountedCompleter will be covered separately later

2.2 Task Submission and Result obtaining

    val task = Task(20)
    task.fork()
    print(task.join())
Copy the code

Or thread pool commit

    print(ForkJoinPool(10).submit(Task(20)).join())
    print(ForkJoinPool(10).invoke(Task(20)))
    print(ForkJoinPool.commonPool().invoke(Task(20)))
Copy the code

ForkJoinPool.commonPool() is a common, provided instance of ForkJoinPool. Note that the join method is a blocking method; Note also that the fork method commits the task, but the task can be stolen, so the join method can get the results immediately. Therefore, results need to be obtained in reasonable places; You can also get the submit task handle and get the value where necessary

Isn’t it easy to use, but I said the above task returns the result

    return t1.join() + t2.join()
Copy the code

Replace with

    return t2.join() + t1.join()
Copy the code

Would you believe it would be a little more efficient? This has to do with the logic in the JOIN method that if the task is the last to join, it can be executed first, without waiting

2.3 CountedCompleter task

Complex and flexible to use; It can convert itself into a RecursiveTask or RecursiveAction task by internal logic, and can be used more flexibly. The biggest difference is that only one task needs to join and the tasks do not block the call within the thread pool. The connection between tasks needs to be triggered by the corresponding callback. It merges the results of its dependencies by completing the callback method; The following two member variables are added internally

final CountedCompleter<? > completer; volatile int pending;Copy the code

Completer: The node that depends on the current task; It’s like a linked list, but it’s not, it’s more like a tree; The initial task is the root node, which depends on its child node

Pending: The number of dependent nodes of the current node, or the number of its children. Class provides a list of method operations that are not covered; Its internal method calls are always compared before 0 and then reduced by 1, so this number +1 is the number of dependencies when the internal method ends the task

In general, we do not need to operate pending directly. We can use some methods already provided by pending to achieve the effect. There are several methods:

  • TryComplete: Starts at the current point and loops through the dependent node, ending when

    1. If pending is 0 and the dependent node is empty: When pending is 0, the onCompletion processing method is called back. If the dependent node is empty, call the quietlyComplete method to set the execution state to done
    2. Processing the pending value -1 of the current node succeeded
  • The propagateCompletion method, in contrast to the tryComplete method, has no onCompletion method to call back, meaning that there is no concern for each intermediate task

  • QuietlyCompleteRoot: Follow the pointer field to find the root dependent node and set the normal end state for it; The more violent end of the task state, this is suitable for finding a certain result to stop

OnCompletion callback method

This method only notifies the current task that all dependencies have completed for task merge operations, but only knows about the last completed dependency in this method;

Why does the CountedCompleter set the normal end state when ForkJoinTask is executing method logic

    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }
Copy the code

That is, the existing ForkJoinTask exec method returns true, and only CountedCompleter returns false, so the task must be set to the normal end state before the task is declared finished and blocked when the task is called, for example, by fork JoinTask. It doesn’t matter if you’re just adding a task to it

Similar to a RecursiveAction effect

class Task(private val num : Int,private val end : Int, completer: Task? = null) : CountedCompleter<Void>(completer) {
    override fun compute() {
        if (end == num) {
            if (end % 2 == 0) println("odd $end")
            propagateCompletion()
            return
        }
        addToPendingCount(1)
        val middle = (num + end) / 2
        Task(num, middle, this).fork()
        Task(middle + 1, end,this).fork()
    }
}
Copy the code

Recursivetask-like effect

class Task(val num : Int,val end : Int, completer: Task? = null) : CountedCompleter<Int>(completer) { @Volatile public var mResult = 0 private var t1 : Task? = null private var t2 : Task? = null override fun compute() { if (end == num) { mResult = end tryComplete() return } addToPendingCount(1) val middle =  (num + end) / 2 t1 = Task(num, middle, this).fork() as Task t2 = Task(middle + 1, end,this).fork() as Task } override fun onCompletion(caller: CountedCompleter<*>?) { if (this ! = caller && caller is Task) { mResult = (t1? .mResult ? : 0) + (t2? .mResult ? : 0) } } override fun getRawResult(): Int { return mResult } override fun setRawResult(t: Int?) { mResult = t ? : 0}}Copy the code

If the result is not obtained by root task’s join method, but by some other method of data exchange (Rxjava launch, LiveData, etc.), then you do not need to override the GET /setRawResult method

Look for a particular result

class Task(val num : Int,val end : Int, completer: Task? = null) : CountedCompleter<Int>(completer) {
    @Volatile public var mResult = 0
    override fun compute() {
        if (end % 7 == 0 && end % 5 == 0) {
            (root as Task).mResult = end
            quietlyCompleteRoot()
            return
        } else if (num == end) {
            return
        }
        addToPendingCount(1)
        val middle = (num + end) / 2
        Task(num, middle, this).fork()
        Task(middle + 1, end,this).fork()
    }

    override fun getRawResult(): Int {
        return mResult
    }

    override fun setRawResult(t: Int?) {
        mResult = t ?: 0
    }
}
Copy the code

There may be other scenarios, but these scenarios are handled based on pending values and their references to determine whether the end state is set;

  • The setting of the atomic operations: addToPendingCount, compareAndSetPendingCount method, etc
  • Handle propagateCompletion, tryComplete, quietlyCompleteRoot, etc

3 Principle Implementation

ForkJoinPool thread pool. The threads that execute tasks are subclasses of ForkJoinWorkerThread. The tasks are wrapped as subclasses of ForkJoinTask

3.1 ForkJoinWorkerThread class

Thread subclass, wherein the main content is: Thread queue creation, destruction, execution

3.1.1 Thread queue Creation

Passed in the constructor ForkJoinPool. RegisterWorker method for the current thread associated queue, queue for thread pool queue array odd location

3.1.2 Thread destruction

Through ForkJoinPool deregisterWorker method for destruction

3.1.4 Running of threads

Run method for its main logic, do not post code; The thread execution needs to be performed after the thread queue is established and before the holding data is applied for space, otherwise no processing is done

The callback method onStart indicates that the thread has started execution. The ForkJoinPool. RunWorker method is used to perform tasks. The onTermination callback method receives exception handling;

3.2 ForkJoinTask class

Abstract class, Future, Serializable interface implementation; Its main contents: task exception collection, fork-join execution process (JOIN can also be invoke, GET, etc., but it is explained here based on JOIN)

The task has the following states

    volatile int status;
    static final int DONE_MASK   = 0xf0000000;
    static final int NORMAL      = 0xf0000000;
    static final int CANCELLED   = 0xc0000000;
    static final int EXCEPTIONAL = 0x80000000;
    static final int SIGNAL      = 0x00010000;
    static final int SMASK       = 0x0000ffff;
Copy the code
  • NORMAL: Indicates the end state, which is NORMAL and negative
  • CANCELLED: end status, user CANCELLED, negative
  • EXCEPTIONAL: EXCEPTIONAL (negative) : ending state
  • SIGNAL: indicates the status of waiting for notification. The value is positive
  • 0: indicates the start state

3.2.1 Collecting Exceptions

Abnormal data collection is processed according to weak reference mechanism. The weak reference task node structure is as follows:

static final class ExceptionNode extends WeakReference<ForkJoinTask<? >> { final Throwable ex; ExceptionNode next; final long thrower; final int hashCode; ExceptionNode(ForkJoinTask<? > task, Throwable ex, ExceptionNode next, ReferenceQueue<Object> exceptionTableRefQueue) { super(task, exceptionTableRefQueue); this.ex = ex; // This. Next = next; This.thrower = thread.currentThread ().getid (); // Thread identifier this.hashCode = system.identityHashCode (task); // Hash corresponding to the object address}}Copy the code

Weak reference node related data structures

private static final ExceptionNode[] exceptionTable; Private static Final ReentrantLock exceptionTableLock; Private static Final ReferenceQueue<Object> exceptionTableRefQueue; // Weak reference reclaim queueCopy the code

Array storage, and use hash mapping, single linked list for conflict resolution; In addition, when exceptions need to be handled, the destroyed task node exceptions can be removed in real time. Common operations are as follows:

  • Record abnormal: recordExceptionalCompletion method, in not completed missions will record
  • Remove the current node exception: clearExceptionalCompletion method
  • Get exception: getThrowableException, a non-current thread exception that needs to be wrapped
  • Clear exceptions associated with invalid tasks: expungeStaleExceptions Static method: Clears all exceptions associated with tasks in the recycle queue

3.2.2 the fork – join logic

The fork method is used to save tasks to a queue; The even-numbered task queue is not thread dependent and the odd-numbered task queue is thread private

   public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }
Copy the code
  1. Currently executed in a ForkJoinWorkerThread, the ForkJoinWorkerThread is queued by calling the workqueue. push method
  2. Put into a queue at an even position in the queue array in the thread pool

The join method is used to block fetching results

public final V join() { int s; if ((s = doJoin() & DONE_MASK) ! = NORMAL) reportException(s); return getRawResult(); } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }Copy the code

Again, it depends on the thread type

  1. If the status is less than 0, that is, the task is finished, the system returns an exception or throws an exception
  2. When not executed, the ForkJoinWorkerThread is not executed, but waits with the current task instance as the lock object. (The more specific logic is analyzed in the externalAwaitDone method.)
  3. When not executed, ForkJoinWorkerThread is executed within the thread. Returns if the task is at the top of the current thread queue (that is, the last one submitted) and in the finished state after execution
  4. AwaitJoin waits in the thread pool (while other task queues may be stolen for execution)

ExternalAwaitDone method

Execution is first attempted, and the doExec method is executed (the exec() method is called for concrete execution) if the following conditions are met

  1. CountedCompleter task type, the common thread pool method externalHelpComplete returns true
  2. For other task types, the common thread pool tryExternalUnpush method returns true

If not, wait through staUS atomic operation +synchronized lock

3.2 ForkJoinPool class

There are some constants meaning, queue structure, execution flow, stealing thread ideas;

3.2.1 State member variables

    volatile long ctl;                
    volatile int runState;
    final int config;
Copy the code

Ct1, 64-bit, is divided into four segments, each consisting of 16 contiguous bits

  • High 16 bits, number of threads processing tasks; Initialize to a negative value of the number of parallel threads (the number of parallel threads in the constructor, generally the maximum number of threads that can be created)
  • Next 16 bits, total number of threads, initialized to a negative value of the number of parallelism
  • The next-lowest 16 bits, the thread state, is less than 0 and you need to add a new thread, or the 48-bit position is 1 and you need to add a new thread
  • 16 bits lower, the idle thread corresponds to the task queue in the index position of the queue array

RunState, which has the following states, defaults to 0

    private static final int  STARTED    = 1;
    private static final int  STOP       = 1 << 1;
    private static final int  TERMINATED = 1 << 2;
    private static final int  SHUTDOWN   = 1 << 31;
Copy the code

Config: Low 16 bits represent parallelism, high 16 bits represent queue mode, default last in first out

3.2.3 Thread queue

volatile WorkQueue[] workQueues
Copy the code

Array structure, divided into thread queue and non-thread queue, randomly find the location to create and search; To achieve uniform WorkQueue processing to reduce WorkQueue synchronization overhead

volatile int scanState; // Negative: inactive, non-negative: active, where the odd number represents scanning int stackPred; // sp = (int) CTL, the identification information of the previous queue stack, including the version number, active or not, and the queue index int Nsteals; Int hint; // a random number used to help task theft, int config is used in the helpXXXX() method; // Configuration: the lower 16 bits represent the index in queue[]. The higher 16 bits: mode Optional FIFO_QUEUE (1 << 16) and LIFO_QUEUE (1 << 31). The default is LIFO_QUEUE volatile int qlock. // Lock the identifier: 1: locked, < 0: terminate; else 0 volatile int base; // index of next slot for poll int top; // index of next slot for push ForkJoinTask<? >[] array; // Task listCopy the code

The data structure body of the WorkQueue: task array, task queue header, and task queue tail; And thread operation synchronization flags, using atomic operation +volatile, with -1 indicating that the operation is not allowed, 0 indicating that the operation is allowed, and 1 indicating that the operation is normal

Therefore, its methods can be divided into thread-safe methods and non-thread-safe methods. Thread-safe methods are used for stealing and non-thread-safe methods are used for executing in-thread tasks

  • Push method: add data to the end of queue, not thread safe
  • GrowArray method: Array expansion, 2 is expanded, not thread safe
  • Pop method: Fetching data from the tail, atomic operation is thread-safe but not successful
  • PollAt method: Fetching data from the header, atomic operations are thread-safe but not successful
  • Poll: Pull data from the head, atomic manipulation + spin, ensure thread safety
  • NextLocalTask: According to the policy, data is extracted (processed according to CongFig), which is thread safe
  • Peek: Returns the header or tail element based on exit mode, but does not fetch it. It is not thread-safe
  • TryUnpush: Attempts to determine whether it is a tail-team task, thread-safe, but the results may not be accurate
  • SharedPush: Shared queue (even position WorkQueue instance), adding data to the end of the queue method, using qlock atomic operations to achieve thread safety but not guarantee accuracy, where queue expansion is handled by growAndSharedPush method and adding data
  • TrySharedUnpush: Checks whether tasks are at the bottom of the queue. Atomic operations are thread safe but not accurate
  • CancelAll: Cancels all tasks
  • LocalPopAndExec: The task is executed from the end of the queue. Atomic operation + spin ensures thread safety. If there is thread contention, the task exits without processing
  • LocalPollAndExec: Executes the task from the head of the queue. Atomic operation + spin ensures thread safety. If there is thread contention, the queue exits and no processing is performed
  • RunTask: Performs the steal task and calls localPopAndExec or localPollAndExec to continue the thread queue task processing based on the queue
  • TryRemoveAndExec: spin + atomic operation, as far as possible to execute tasks in the thread private queue; Non-queue data, atomic operation is EmptyTask
  • PopCC: Pull the CountedCompleter task at the end of the queue, atomic operation + spin for thread safety
  • PollAndExecCC: Fetch the team head CountedCompleter task and execute it, atomic operation + spin for thread safety

3.2.4 Call Process

There are three main processes: task submission process, thread execution process and result obtaining process

Submit a task

From the class point of view

  1. The thread pool submits the task
  2. The fork ForkJoinTask class

From a functional point of view

  1. Fork The thread submits tasks internally
  2. The non-forked thread submits the task, and the first task must be this way

External submission task

Internally submit the task, calling the thread-private WorkQueue object directly and adding the push method to the queue

Threads execute

Join Obtains the task result

The processing process can be roughly known from the above three processes, but the specific logic of stealing is not clear. Here are some methods to study carefully to grasp the essence of your thoughts

  1. Scan method: the forked thread steals the task. The first task of the forked thread is stolen
  2. AwaitJoin: stealing your own task while waiting in the thread pool and not being processed
  3. The difference between CountedCompleter tasks and other task processing is that CountedCompleter tasks do not block each other
  4. Lock wait mechanism: There may be errors in the diagram; When a thread is idle, the thread will be suspended or enabled. The suspended wait of a task is the wait method of Object, and notifyAll will wake up all after its execution
  5. Bit operations, and judgment processing between states, and some of these performance pursuits

In terms of specific method analysis, I am not very clear about some points, but if I do not write relevant stealing algorithm or some transfer ideas, it is ok to be unclear about some parts

Technology changes quickly, but the basic technology, the theoretical knowledge is always the same; The author hopes to share the basic knowledge of common technical points in the rest of his life. If you think the article is well written, please pay attention and like it. If there are mistakes in the article, please give me more advice!