Usage scenarios

  1. CountDownLatch can be used when a task requires multiple threads to execute and the threads need to start executing at the same time

nature

  1. Although CountDownLatch this class didn’t inherit AbstractQueuedSynchronizer directly, but he used a final modified variable AbstractQueuedSynchronizer sync inheritance, So it essentially uses the AQS sharing model
  2. The effect of this class is to set up a barrier after a thread is started. This barrier blocks all threads and will only disappear if all threads are active

The illustration

Source code analysis

Sync

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
// In fact, this count is equivalent to state in AQS
    Sync(int count) {
        setState(count);
    }
// Get the state of state
    int getCount(a) {
        return getState();
    }
Copy the code

tryAcquireShared

    // Try to obtain the shared lock
    // The initial state of a thread is 0, but since the state is set to shared when it joins the queue, it will not be 0 when it joins the queue for the first time
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0)?1 : -1;
    }
Copy the code

tryReleaseShared

// Spin state minus 1
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
  Nextc = 0; nextc = 0;
  // Change state to 1 to break the loop and return true
            return nextc == 0; }}Copy the code

await

public void await(a) throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
Copy the code

acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
     // Check whether the current thread is interrupted
    if (Thread.interrupted())
        throw new InterruptedException();
    The tryAcquireShared method is the aQS overridden method inherited from sync above
    If the thread status is 0, return 1, otherwise return -1
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
Copy the code

doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // Add the current node to the blocking queue
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
        // Returns the last node to help the GC
            final Node p = node.predecessor();
            if (p == head) {
            // This method returns -1 as long as state is not equal to 0
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return; }}// Delete the unwaited node, and set the state of the previous node to
            -1
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw newInterruptedException(); }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

predecessor

final Node predecessor(a) throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}
Copy the code

countDown

public void countDown(a) {
    sync.releaseShared(1);
}
Copy the code

releaseShared

public final boolean releaseShared(int arg) {
   // tryReleaseShared returns true only if state is reduced to 0
    // Otherwise just a simple state = state-1 and the countDown() method ends
    // The operation that reduces state to 0 is the most complicated, so keep going
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
Copy the code

doReleaseShared

private void doReleaseShared(a) {
    for (;;) {
        Node h = head;
        if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
  // Compare substitutions, expecting -1, wanting to change it to 0
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                    continue;            
        // Wake up the head's successor node, which is the first node in the blocking queue
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break; }}Copy the code

unparkSuccessor

private void unparkSuccessor(Node node) {
// Get the status of the node
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
        LockSupport.unpark(s.thread);
}
Copy the code

df

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
  // Set node as a header
    setHead(node);
The propagate argument is generally 1 because the propagate from await is 1
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
  // If s is empty, or if s is also a shared lock, it will wake up later
        if (s == null|| s.isShared()) doReleaseShared(); }}Copy the code

Reference links: – line source code analysis clear AbstractQueuedSynchronizer (3)