introduce

A synchronization aid that allows a group of threads to wait for each other to reach a common obstacle point. This barrier is called a loop because it can be reused after the waiting thread is released. We analyzed CountDownLatch earlier. Here are the differences between the two:

CountDownLatch: a thread (or multiple threads) that waits for N other threads to complete an item (the N threads do not wait on each other, but can complete it first).

CyclicBarrier: N threads wait for each other, and all threads must wait for each other if any thread does not arrive or complete

Take a simple example to understand the CyclicBarrier. For example, in a sports meeting, if one of the athletes is ready before stepping on the platform, the presenter cannot award the prize. The presenter can only award the prize after the champion, third and second runner-up are all ready. Then the three athletes are ready and this is a barrier point. Only when the three athletes are ready and the barrier point is breached, the presenters can conduct the awarding operation.

The source code parsing

Main field information

// Record whether the current barrier has been broken, if broken=true.
private static class Generation {
    boolean broken = false;
}

![banjiang.png](https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/eed94a7f9bf64187ac37e2af3e665e97~tplv-k3u1fbpfcp-watermark.image)
/ * * already * / lock
private final ReentrantLock lock = new ReentrantLock();
/** Lock the corresponding condition */
private final Condition trip = lock.newCondition();
/** The number of threads to wait */
private final int parties;
/* What is executed when the barrier is breached */
private final Runnable barrierCommand;
/** is used to record whether the current barrier has been broken. Broken if broken=true. Volatile is not declared because only one thread acquires */
private Generation generation = new Generation();

/** * How many more threads need to reach the barrier point before the barrier can be breached. * /
private int count;
Copy the code

The constructor

CyclicBarrier is based on the exclusive lock implementation of ReentrantLock and is essentially based on the AQS implementation. Every time a thread calls await method, it suspends the current thread and puts it in the AQS conditional queue. Count decreases by one. Indicates that all threads have reached the barrier point to perform tasks passed through the constructor. Let’s start with the constructors. CyclicBarrier provides two constructors, as follows:

/** * Specifies the number of threads to wait for each other and the barrierAction is executed by the last thread to enter the barrier. * /
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

/** * specifies the thread to wait for each other. * /
public CyclicBarrier(int parties) {
    this(parties, null);
}
Copy the code

The constructor makes it clear that there are three variables: count, parties, and barrierCommod. BarrierCommod is the task that needs to be performed after passing the barrier. It is well known that CyclicBarrier is reusable. If only one count is designed and count is subtracted by 1 when the thread calls await, it cannot be reused. Parties always record the total number. Count is used to control how many more threads are required to reach the barrier point. When count decreases to 0, it indicates that all threads have reached the barrier point, and Parties reassign the value to count for reuse.

Await method

How does the inside of await operate? Start with the source code, as shown below:

public int await(a) throws InterruptedException, BrokenBarrierException {
    try {
      	// The dowait method is called internally
        return dowait(false.0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen}}Copy the code

According to the source code, “await” actually calls the internal dowait method to perform thread waiting operations. When an exception is thrown, an Error message will be directly thrown.

/** * Main obstacle code. * /
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
	// Check whether broken is true. If true, the barrier is broken, throw an exception.
        if (g.broken)
            throw new BrokenBarrierException();
	When the thread is interrupted, the barrier has been broken, and other threads are notified. When the thread waiting on the other line acquires the lock, an exception will be thrown directly.
        if (Thread.interrupted()) {
            // Set broken to true and notify all waiting threads.
            breakBarrier();
            throw new InterruptedException();
        }
	// count is reduced.
        int index = --count;
      	// When count decreases to 0, run the barrierCommand barrier task passed by the constructor.
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if(command ! =null)
                    // Run the method.
                    command.run();
                ranAction = true;
              	// Tell all threads to break the barrier and loop the next barrier.
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if(! timed)// If a conditional queue is waiting,
                    trip.await();
                else if (nanos > 0L)
                    // Wait a while for no signal from another thread to return directly.
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.Thread.currentThread().interrupt(); }}if (g.broken)
                throw new BrokenBarrierException();
	    // Determine if you have broken through the barrier round and entered the next barrier round.
            if(g ! = generation)return index;
	    // If the wait time is less than 0, an exception is thrown.
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw newTimeoutException(); }}}finally{ lock.unlock(); }}Copy the code
  1. In other words, there is only one thread to operate at the same time, and other threads need to wait in the waiting queue of AQS.

  2. Check whether broken is true. If broken is true, the barrier is broken, so throw an exception. When will this be broken

    • The current thread interrupts the request accordingly
    • If the wait time is specified and the pass wait time is negative, the barrier is also broken.
    • BarrierCommand also breaks barriers when an error is reported in the task passed by the constructor.
  3. Determine if count has been reduced to 0, indicating that all threads have reached the barrier point, the last thread executes the barrierCommand task passed through the constructor, calls the nextGeneration method, and performs the next resetting action for reuse.

    private void nextGeneration(a) {
        // Notify all threads in the waiting queue.
        trip.signalAll();
        // Reset count.
        count = parties;
        // Create a generation object. Let's look down here.
        generation = new Generation();
    }
    Copy the code
    • Notify all threads waiting in the conditional queue, and the waked thread in the conditional queue must first acquire the lock. The waked thread will be added to the AQS wait queue and acquire the lock. When a thread obtains the lock, it will continue to perform the operation in the await place.
    • When executed to if (g! = generation) return index; At this point, another thread has set generation to the new object, so it returns directly.
  4. How do breakbarriers control breaking barriers?

    private void breakBarrier(a) {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    Copy the code
    • Set Generation’s broken to true and count to parties.
    • If (g.broken) throw new BrokenBarrierException(); if (g.broken) throw new BrokenBarrierException(); If the if statement is true, an exception is thrown to break the barrier.

conclusion

  • A synchronization aid that allows a group of threads to wait for each other to reach a common obstacle point.
  • This barrier is called a loop because it can be reused after the waiting thread is released.
  • Internally, AQS are still used, but CyclicBarrier is internally implemented as opposed to CountDownLatch, which is controlled by the AQS synchronized lock state value.
  • CountDownLatch: a thread (or multiple threads) that waits for N other threads to complete an item (the N threads do not wait on each other, but can complete it first).
  • CyclicBarrier: N threads wait for each other, and all threads must wait for each other if any thread does not arrive or complete

If you like, you can follow my wechat public account and push articles from time to time