This article has participated in the activity of “New person creation Ceremony”, and started the road of digging gold creation together.

1. Introduction

CountDownLatch is a simple synchronizer that allows one or more threads to wait for other threads to complete before performing subsequent operations.

CountDownLatch is similar to thread.join () in that it waits for other threads to complete their task.

# CountDownLatchCountDownLatch is equivalent to a counter that is initialized with a value. Each time the CountDownLatch () method is called, the value of the counter is reduced by one. If the counter is not reduced to zero, the thread waits for the counter to decrease to zero if the await() method is performed on the counter. CountDown () is then called and the counter is reduced to zero. After that, the thread passes and executes the next task.Copy the code

2. Simple use

Example 1: Have a thread wait for multiple threads to finish executing

// Make the main thread wait for other threads to finish executing
package CountDownLatch;

import java.util.concurrent.*;

public class Demo01 {
    static int n = 5;
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(n);  //CountDownLatch counter, initialized to 5
        ExecutorService executors = Executors.newFixedThreadPool(n);	// Create a pool of 5 threads

        for(int i = 1; i <= n; i++){
            final int tmp = i;
            executors.execute(new RunnableImpl(i, latch));	// Execute thread
        }
        long start = System.currentTimeMillis();
        latch.await();						// Let the main thread wait until other threads finish executing (counter 0)
        System.out.println("The main thread waits for all child threads to complete.");
        long end = System.currentTimeMillis();
        System.out.println("Waiting Time :" + (end - start));
        executors.shutdown();				// Close the thread pool
    }
    static class RunnableImpl implements Runnable{
        CountDownLatch latch;
        int id;
        public RunnableImpl(int id, CountDownLatch latch){
            this.id = id;
            this.latch = latch;
        }
        public void run(a){
            try {
                System.out.println("Thread" + id + "Start the mission.");
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread" + id + "Mission accomplished.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                latch.countDown();			// The child thread decrement the counter by one after it finishes, and the thread with the counter await() executes when the counter is 0}}}}Copy the code

Execution Result:

Example 2: Let multiple threads start at the same time

package CountDownLatch;

import java.util.concurrent.*;


public class Demo02 {
    private static final int TASK_COUNT = 8;
    private static final int THREAD_CORE_SIZE = 10;

    static int n = 10;
    static CountDownLatch latch1 = new CountDownLatch(1);
    static CountDownLatch latch10 = new CountDownLatch(10);
    public static void main(String[] args) throws InterruptedException {



        for(int i = 1; i <= n; i++){
            final int tmp = i;
            new Thread(new RunnableImpl(tmp, latch1, latch10)).start();
        }
        CountDown (); // countDown(); // countDown()
        TimeUnit.SECONDS.sleep(5);
        latch1.countDown();
        long mainWaitStartTimeMillis = System.currentTimeMillis();
        latch10.await();
        long mainWaitEndTimeMillis = System.currentTimeMillis();
        System.out.println("All child threads execute task complete");
        System.out.println("Main thread wait duration:" + (mainWaitEndTimeMillis - mainWaitStartTimeMillis));
    }

    /**
     * 工作线程
     */
    static class RunnableImpl implements Runnable {
        /** * Task ID */
        private int taskId;

        /** * CountDownLatch synchronizes counters */
        private CountDownLatch latch1;
        private CountDownLatch latch10;

        @Override
        public void run(a) {

            try {
                latch1.await();				// Wait until all threads are ready, then main releases the gate
                System.out.println("Thread" + taskId + "Start the mission." + "Execution start time" + System.currentTimeMillis());
                TimeUnit.SECONDS.sleep(1);
                System.out.println("Thread" + taskId + "Mission accomplished.");
            } catch (InterruptedException e) {
            } finally{ latch10.countDown(); }}public RunnableImpl(int taskId, CountDownLatch latch1, CountDownLatch latch10) {
            this.taskId = taskId;
            this.latch1 = latch1;
            this.latch10 = latch10; }}}Copy the code

Results:

CountDownLatch acts as a fence, equivalent to a wall with n health. Every time countDown() is called, an arrow is shot and the wall’s health is reduced by 1. Only when the wall’s health is reduced to 0 can the thread that called the await of CountDownLatch continue to execute.

3. CountDownLatch parsing

3.1 structure

CountDownLatch contains only one internal class, Sync. There is no fair or unfair lock. Sync inherits AQS, and the lock inside CountDownLatch is a shared lock, meaning that multiple threads can invoke it simultaneously.

3.2 Source Code Parsing

package java.util.concurrent;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
		// This count is the initial value passed in to initialize CountDownLatch
        Sync(int count) {
            // Assign the value of count to state, which is used to control locking and unlocking
            setState(count);
        }
		// Get the value of count, and call getState() internally to get the value of state
        int getCount(a) {
            return getState();
        }
		// Try to acquire the shared lock, return 1 when state is 0, queue when state is -1
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0)?1 : -1;
        }
		// Try to release the shared lock
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                // If state is 0, there is no lock to release, return false
                if (c == 0)
                    return false;
                int nextc = c-1;
                If the value of state-1 is 0, the lock is successfully released. If the value of state-1 is not 0, the lock is not fully released
                if (compareAndSetState(c, nextc))
                    return nextc == 0; }}}// Internal Sync class, which inherits AQS
    private final Sync sync;
	
    // constructor
    public CountDownLatch(int count) {
        // count<0 throws an exception
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // Initialize count, essentially initialize state
        this.sync = new Sync(count);
    }
	
    // Let the thread wait here
    public void await(a) throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
	// Wait at the specified time
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

	// Call AQS releaseShared
    public void countDown(a) {
        sync.releaseShared(1);
    }
	// Get the value of state
    public long getCount(a) {
        return sync.getCount();
    }
	// Override the toString method
    public String toString(a) {
        return super.toString() + "[Count = " + sync.getCount() + "]"; }}Copy the code

3.2.1 await () method

public void await(a) throws InterruptedException {
    / / call AQS acquireSharedInterruptibly method
    sync.acquireSharedInterruptibly(1);
}
Copy the code

Invokes the AQS acquireSharedInterruptibly method

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // tryAcquireShared returns 1 to indicate that state is 0 and no shared lock has been obtained; Returning -1 indicates that the state is greater than 0 and the shared lock is obtained
    / / try to access a Shared lock, returns 0 shows that the state is not 0, and thread holds a lock, then call doAcquireSharedInterruptibly method
    // For the thread executing await, state is 0 and returns 1, so it will not block
    // If state is not 0, the thread must be wrapped as a node and queued to block
    if (tryAcquireShared(arg) < 0)
        // Use shared interrupt mode
        doAcquireSharedInterruptibly(arg);
}
Copy the code

DoAcquireSharedInterruptibly (int arg) method

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // wrap the thread calling await as a Node and add it to the AQS blocking queue
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // Get the precursor of the current node
            final Node p = node.predecessor();
            // Head indicates that the current node has the right to acquire the lock
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return; }}/ / shouldParkAfterFailedAcquire will find a precursor is not cancelled for the current node node, and the state of the precursor to the node set to 1,
            // indicates that the successor node is awakened
            if (shouldParkAfterFailedAcquire(p, node) &&
                // parkAndCheckInterrupt() suspends the current thread
                parkAndCheckInterrupt())
                throw newInterruptedException(); }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

3.2.2 countDown () method

public void countDown(a) {
    // AQS releaseShared is called
    sync.releaseShared(1);
}
Copy the code

ReleaseShared method

public final boolean releaseShared(int arg) {
    // CountDownLatch's attempt to release the resource will be called. After releasing the resource, the latch will return true if state is 0 and false if state is not 0
    if (tryReleaseShared(arg)) {
        // If state is 0, we need to wake up the thread executing the await method. In this case, the header wakes up its successor
        doReleaseShared();
        return true;
    }
    return false;
}
Copy the code

DoReleaseShared () method

private void doReleaseShared(a) {

    for (;;) {
        // Get the header
        Node h = head;
        if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
            // Wake up the successor if the state of the header is wake up
            if (ws == Node.SIGNAL) {
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break; }}Copy the code

4. To summarize

  1. CountDownLatch allows one or more threads to wait for other threads to complete before performing subsequent tasks.

  2. CountDownLatch is implemented using a shared lock mechanism.

  3. Every time I call await I try to get the lock to see if the state is equal to 0.

  4. The countDown method is called to attempt to release the lock, and when state is 0 the head node (the thread currently executing) is retrieved and its successors woken up.