Function introduction

Locking is a synchronization utility class that can delay the progress of a thread until it reaches a terminating state [CPJ 3.4.2]. A latch acts like a door: the door remains closed and no thread can pass through until the latch reaches the end state, when the door opens to allow all threads to pass through. When the lock reaches the end state, it will never change state, so the door will remain open forever. Latching can be used to ensure that certain activities do not continue until other activities have completed, such as:

  • Ensure that a computation does not proceed until all the resources it needs have been initialized. A binary lock (consisting of two states) can be used to indicate that “resource R has been initialized”, and all operations requiring R must wait on this lock first.
  • Ensure that a service is started only after all other services it depends on have been started. Each service has an associated binary lock. When the service S is started, it first waits on the latches of other services that S depends on. After all dependent services are started, the latches of S are released so that other services that depend on S can continue to execute.
  • Wait until all participants of an action (for example, all players in a multi-player game) are ready before proceeding. In this case, when all players are ready, the lockout reaches the end state.

CountDownLatch is a flexible locking implementation that can be used in each of these situations to make one or more threads wait for a set of events to occur. The locked state consists of a counter that is initialized to a positive number representing the number of events that need to wait. The countDown method decrement the counter, indicating that an event has occurred, and the await method waits for the counter to reach zero, indicating that all events that need to wait have occurred. If the value of the counter is non-zero, then await blocks until the counter is zero, or the waiting thread interrupts, or waits for a timeout.

Use case

Two common uses of lockup are given in TestHarness. TestHarness creates a number of threads that execute the specified task concurrently. It uses two latches representing “Starting Gate” and “Ending Gate”. The initial value of the start gate counter is 1, and the initial value of the end gate counter is the number of worker threads. The first value that each worker thread should do is to wait on the boot gate to ensure that all threads are ready before starting execution. The last thing each thread needs to do is to subtract 1 from the countDown method that calls the end door. This allows the main thread to efficiently wait until all worker threads have finished, so you can count the time consumed.

public class TestHarness {

    public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread(() -> {
                try {
                    startGate.await();
                    try {
                        task.run();
                    } finally{ endGate.countDown(); }}catch (InterruptedException ignored) {

                }
            });
            t.start();
        }

        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }

    public static void main(String[] args) throws InterruptedException {
        TestHarness testHarness = new TestHarness();
        AtomicInteger num = new AtomicInteger(0);
        long time = testHarness.timeTasks(10, () -> System.out.println(num.incrementAndGet()));
        System.out.println("cost time: " + time + "ms"); }}// Output the result
1
10
9
8
7
5
6
4
3
2
cost time: 2960900ms
Copy the code

Why use latching in TestHarness instead of starting as soon as the thread is created? Perhaps we want to test how long it takes n threads to execute a task concurrently. If threads are started immediately after they are created, the threads started first will “lead” the ones started later, and the number of active threads will increase or decrease over time, with the level of contention constantly changing. The start gate enables the master thread to release all worker threads when it asks, while the end gate enables the main thread to wait for the last thread to complete, rather than waiting sequentially for each thread to complete.

Used to summarize

CountDownLatch is disposable. The value of the calculator can only be initialized once in the constructor, and there is no mechanism to set the value again. When CountDownLatch is used, it cannot be used again.

Source code analysis

The code analysis

CountDownLatch or adopt AbstractQueuedSynchronizer implementation on the ground floor.

CountDownLatch startGate = **new **CountDownLatch(1);

Let’s start by looking at its constructor, which creates a Sync object.

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
Copy the code

Sync is an implementation of AbstractQueuedSynchronizer, according to the literal meaning we can guess it is a fair way

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    // constructor
    Sync(int count) {
        setState(count);
    }

    // Get the number of resources
    int getCount(a) {
        return getState();
    }

    / / acquiring a lock
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0)?1 : -1;
    }

    / / releases the lock
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            / / CAS unlocked
            if (compareAndSetState(c, nextc))
                return nextc == 0; }}}Copy the code

In the await method, if there is an await value, the current thread will enter the AQS queue to generate a Node Node, and the thread will block

public void await(a) throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
Copy the code

The main thing is to get the shared lock

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
Copy the code

Countdownlatch.sync implements the tryAcquireShared method, which returns 1 if getState() == 0, -1 otherwise. That is to say, will continue to create instances then CountDownLatch await execution method call doAcquireSharedInterruptibly (arg);


// Whether the shared lock can be obtained
protected int tryAcquireShared(int acquires) {
    return (getState() == 0)?1 : -1;
}


// Try to get the lock, or join the team
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw newInterruptedException(); }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

In the countDown method, if there are waiting threads, they are woken up. Or reduce the number of CountDownLatch resources.

public void countDown(a) {
    sync.releaseShared(1);
}
Copy the code

ReleaseShared is used to unlock the shared lock

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
Copy the code

DoReleaseShared is eventually called to wake up the head node in AQS

private void doReleaseShared(a) {
    /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
    for (;;) {
        Node h = head;
        if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break; }}Copy the code

The detailed process is shown below

Source flow chart

The resources

  • Java Concurrent Programming
  • www.cnblogs.com/Lee_xy_z/p/…