CountDownLatch is different from CyclicBarrier

CountDownLatch and CyclicBarrier are two very easy to use thread synchronization utility classes provided by Java and packages, and they are presented together because they are somewhat similar and quite different.

CountDownLatch is primarily used to solve the situation where one thread waits for multiple threads. It is analogous to the leader of a tour group waiting for all visitors to arrive before moving on to the next attraction.

A CyclicBarrier is a group of threads that wait for each other, and as long as one thread doesn’t complete, the others wait, much like you and your wife.

For CountDownLatch, the focus is on the one thread that is waiting, while the other N threads can continue to wait and terminate after they have done something.

For cyclicbarriers, the focus is on the set of N threads, and if any of them does not complete, all the threads must wait.

In addition, the CountDownLatch’s counter is not recycled, meaning that once the counter drops to zero and another thread calls await(), the thread will pass directly. But CyclicBarrier counters are recyclable and automatically reset to your original value once the counter drops to zero. CyclicBarrier also has the ability to set callback functions, which is quite feature-rich.

CountDownLatch use

Let’s say there are three tourists in the group, and the leader of the group waits until all the tourists arrive before leaving for the next attraction

CountDownLatch latch = new CountDownLatch(3);
ExecutorService executor = Executors.newFixedThreadPool(3);

IntStream.rangeClosed(1.3).forEach(i -> {
    executor.execute(() -> {
        System.out.println("Tourists" + i + "At the assembly point.");
        latch.countDown();
    });
});

latch.await();

System.out.println("All personnel are here. Let's go to the next attraction.");

Copy the code

A CountDownLatch is created, with the initial value of the counter equal to 3, and the counter is then decrement every time a latch arrives (latch.countdown () implementation). In the main thread, we wait for the counter to equal zero by calling latch.await().

CountDownLatch source code analysis

CountDownLatch is implemented via AQS, where the counter value is actually the value of State in AQS, that is, our State value will be initialized to the value we pass in.

When we call coutnDown we’re actually subtracting the value of state (counter minus 1)

// CountDownLatch code
public void countDown(a) {
  sync.releaseShared(1);
}

private static final class Sync extends AbstractQueuedSynchronizer {

  // countDown calls this method
  protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        // The value of the counter decreases by 1
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0; }}}// code in AQS
public final boolean releaseShared(int arg) {
  // Wake up the blocked thread when state equals 0
  if (tryReleaseShared(arg)) {
      doReleaseShared();
      return true;
  }
  return false;
}

Copy the code

When we call await, we check whether the current state value is 0. If it is 0, it means that the other thread has finished and can continue. Otherwise, the current thread is blocked.

// CountDownLatch code
public void await(a) throws InterruptedException {
  sync.acquireSharedInterruptibly(1);
}

// await calls this method
protected int tryAcquireShared(int acquires) {
  return (getState() == 0)?1 : -1;
}

// code in AQS
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {

  if (Thread.interrupted())
      throw new InterruptedException();
  
  if (tryAcquireShared(arg) < 0)
      // Block the thread
      doAcquireSharedInterruptibly(arg);
}
Copy the code

Now the logic is clear. When we call the countDown() method, we subtract one from the state value in the AQS, and when the state value goes to zero we wake up blocked threads in the CLH queue. When we call the await() method, we determine whether the value of state is 0, and if it is 0 we continue. If not, the thread blocks and waits to be woken up (as in the countDown() method).

CyclicBarrier use

On the weekend, I went to a barbecue with my wife, CyclicBarrier


// The thread pool to perform the callback, very important point
ExecutorService executor = Executors.newFixedThreadPool(1);

// Specify the counter value and the callback function
CyclicBarrier barrier = new CyclicBarrier(2, () -> {
    executor.execute(() -> System.out.println("Here we are. Let's go barbecue."));
});

new Thread(() -> {
  try {
    TimeUnit.SECONDS.sleep(2);
    System.out.println("Half an hour later, Lady Think123 is ready.");
    barrier.await();
  } catch (InterruptedException e) {
    e.printStackTrace();
  } catch (BrokenBarrierException e) {
    e.printStackTrace();
  }
}).start();

new Thread(() -> {
  System.out.println("Think123 is ready");
  try {
    barrier.await();
  } catch (InterruptedException e) {
    e.printStackTrace();
  } catch (BrokenBarrierException e) {
    e.printStackTrace();
  }
}).start();

Copy the code

CyclicBarrier source code analysis

public class CyclicBarrier {

  private static class Generation {
      boolean broken = false;
  }

  private final ReentrantLock lock = new ReentrantLock();
  
  private final Condition trip = lock.newCondition();
 
  private final int parties;
 
  private final Runnable barrierCommand;

  private Generation generation = new Generation();

  private int count;
}
Copy the code

The important properties of CyclicBarrier are shown above, and their property names are interesting. To implement a group of threads waiting for each other, lock and condition are used, and parties indicate the number of threads in a group (a counter), with count indicating how many threads are currently outstanding. BarrierCommand represents a function that needs to be called back when all threads are ready. Generation is for counter recycling, you can think of it as version.

Now let’s see how the await method is implemented. I’m just keeping the core code logic


public int await(a) {
  return dowait(false.0L);
}
private int dowait(boolean timed, long nanos) {

  final ReentrantLock lock = this.lock;

  // add a lock
  lock.lock();
 
  final Generation g = generation;

  // Omit some code
  
  // How many threads are left to execute
  int index = --count;

  // All threads have finished executing
  if (index == 0) {
    
    // Execute the barrierCommand callback
    final Runnable command = barrierCommand;

    // Notice that the Runnable run method is called instead of thread.start().
    if(command ! =null)
        command.run();

    ranAction = true;
    // Resets the counter and wakes up all waiting threads
    nextGeneration();
    return 0;
      
  }

  // Keep only the core code

  for (;;) {
      // Block the thread
      if(! timed) trip.await();else if (nanos > 0L)
          nanos = trip.awaitNanos(nanos);
      if(g ! = generation)return index;
  }
  lock.unlock();

}

private void nextGeneration(a) {
  // Wake up all threads that are on vacation
  trip.signalAll();
  // set up next generation
  count = parties;
  generation = new Generation();
}

Copy the code

The logic of await is simple. It is basically to determine if the current thread is the last thread to complete execution. If it is the last thread, the callback function needs to be executed, and then all other blocked threads need to be woken up and counters reset. The current thread is blocked if it is not the last to complete.

In particular, the CyclicBarrier callback is executed on the thread that is the last to await() in a turn, and is called synchronously before the second turn begins. Therefore, if the callback function is not executed asynchronously in another thread, it will not be useful for performance optimization.

Wrote last

You tell me, what would you do?

If you think it looks good, remember the triple.