Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star

1. The CyclicBarrier definition

Within a CyclicBarrier, thread execution is controlled by ReeantrantLock, Condition and count. Execute the following code when all threads have reached the same place.

CyclicBarrier differs from CountDownLatch in that it can be reused. A CyclicBarrier is broken if one of the threads used for CyclicBarrier breaks or waits for a timeout, and the barrier is reset to its original value generation

Let’s start with a simple demo

import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class TestCyclicBarrier { private static final Logger logger = Logger.getLogger(TestCyclicBarrier.class); private static final int THREAD_NUM = 5; public static void main(String[] args) { CyclicBarrier cb = new CyclicBarrier(THREAD_NUM, new Runnable() { public void run() { logger.info("Inside Barrier"); }}); List<Thread> threads = new ArrayList<>(); for(int i = 0; i < THREAD_NUM; i++){ Thread thread = new Thread(new WorkerThread(cb)); threads.add(thread); thread.start(); } // wait until done for(Thread thread : threads){ try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info("All Thread done()"); } public static class WorkerThread implements Runnable{ CyclicBarrier barrier; public WorkerThread(CyclicBarrier barrier) { this.barrier = barrier; } public void run() { try { logger.info("Working's waiting"); // The thread waits until all threads have reached barrier barrier.await(); logger.info("Thread ID:" + Thread.currentThread().getId() + " Working"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); }}}}Copy the code

Execution Result:

[14:12:39 2017-02-15, 506] INFO Thread - 0 (TestCyclicBarrier. Java: 57) - Working 's waiting INFO [14:12:39 2017-02-15, 506] Thread - 3 (TestCyclicBarrier. Java: 57) - Working 's waiting [14:12:39 2017-02-15, 506] INFO Thread - 1 (TestCyclicBarrier. Java: 57) - Working 's waiting [14:12:39 2017-02-15, 506] INFO Thread - 2 - (TestCyclicBarrier. Java: 57) Working 's waiting [14:12:39 2017-02-15, 506] INFO (TestCyclicBarrier. Java: 57) Thread - 4 - Working' s waiting [in the 2017-02-15 s 14:12:39, 509] INFO Thread - 4 (TestCyclicBarrier. Java: 23) - Inside the Barrier [14:12:39 2017-02-15, 510] INFO Thread - 4 (TestCyclicBarrier. Java: 60) - Thread ID: 15 Working [14:12:39 2017-02-15, 510] INFO Thread - 0 (TestCyclicBarrier. Java: 60) 11 Working Thread ID: [14:12:39 2017-02-15, 510] INFO Thread - 3 (TestCyclicBarrier. Java: 60) - Thread ID: 14 Working [14:12:39 2017-02-15, 511] INFO Thread - 2 (TestCyclicBarrier. Java: 60) - Thread ID: 13 Working [14:12:39 2017-02-15, 510] The INFO Thread - 1 (TestCyclicBarrier. Java: 60) - Thread ID: 12 Working [14:12:39 2017-02-15, 512] INFO main (TestCyclicBarrier.java:42) - All Thread done()Copy the code

Execution steps: (1) There are five threads that require all of them to reach barrier. Await () before they can proceed. (2) The first four threads call barrier. (3) The fifth thread reaches barrier. Await (), calls reeantrant.lock () and realizes that it is the last thread, Condition. SignalAll () is called to wake up the other threads and release the lock themselves

2. Constructor

The main difference between the following two constructors is that command is executed when all threads have reached the barrier

CyclicBarrier */ public CyclicBarrier(int parties, Runnable barrierCommand) { if(parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierCommand; } / CyclicBarrier */ public CyclicBarrier(int parties){this(parties, null); }Copy the code

3. Main attributes

private static class Generation{ boolean broken = false; /** guard for all sorts of things */ ** private final ReentrantLock = new ReentrantLock(); /** Condition to wait on until tripped */ /** tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ ** private final int parties; /** The command to run when tripped */ /** tripped */ private final Runnable barrierCommand; Private generation generation = new generation (); private generation = new generation ();Copy the code

4. Generate the Generation method

This is the reinitialization value after a barrier has completed

/** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ ** generates the next generation */ Private void nextGeneration(){// Signal completion of last generation // Wake up all waiting threads to obtain the state value of AQS trip.signalAll(); // set up next generation count = parties; // Reinitialize generation generation = new generation (); }Copy the code

5. BreakBarrier method

Breakbarriers are mainly used when waiting threads are interrupted or wait for a timeout to execute

/** * Sets current barrier generation as broken and wakes up everyone * Called only while holding lock */ /** when a thread is interrupted / */ private void breakBarrier(){generation.broken = true; count = parties; trip.signalAll(); }Copy the code

6. Main method awaitXX

The await method is mainly used to wait to get, see comment below

/** * wait for all threads to reach the barrier * unless: Rupt */ Public int await() throws InterruptedException, BrokenBarrierException{try{return dowait(false, 0L); }catch (TimeoutException toe){ throw new Error(toe); // cannot happen}} /** * wait for all threads to reach the barrier * unless: */ public int await(long timeout, TimeUnit unit) throws Exception{return dowait(true, unit.tonanos (timeout)); } /** * Main barrier code, covering the various policies */ ** * */ Private int doWAIT (Boolean timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException{ final ReentrantLock lock = this.lock; lock.lock(); // 1. Obtain ReentrantLock try{final Generation g = Generation; If (g.broken){// 2. Check whether generation has broken throw new BrokenBarrierException(); } if(Thread.interrupted()){ // 3. BreakBarrier breakBarrier(); throw new InterruptedException(); } int index = --count; If (index == 0){// triped // 5. index == 0 indicates that all threads have reached barrier Boolean ranAction = false; try{ final Runnable command = barrierCommand; if(command ! = null){// 6. When the last thread reaches the barrier, execute command command-run (); } ranAction = true; nextGeneration(); // 7. Update generation return 0; }finally { if(! ranAction){ breakBarrier(); } } } // loop until tripped, broken, interrupted, or timed out for(;;) { try{ if(! timed){ trip.await(); }else if(nanos > 0L){nanos = trip.awaitnanos (nanos); Catch (InterruptedException e){if(g == generation &&! G.broken){// 10. If a thread is interrupted, all waiting threads are woken up and broken (); throw e; }else{ /** * We're about to finish waiting even if we had not * been interrupted, So this interrupt is deemed to * "belong" to subsequent execution */ /** * status * 1. = generation * all threads reach the barrier and wake up all threads; The current thread can still get the lock, but in order to let outside programs know that it was interrupted, * 2. Await with InterruptedException &&g == Generation &&g. Cause g.broken = true, and signalALL(), but then * the current thread is also interrupted, but to let outside programs know that it was interrupted, * */ thread.currentThread ().interrupt(); }} if(g.broken){// 11. Throw new broken BarrierException(); } if(g ! = generation){// 12. } if(timed && nanos <= 0L){ // 13. Reset generation breakBarrier(); throw new TimeoutException(); } } }finally { lock.unlock(); // 14. Call awaitXX to get lock and release lock}}Copy the code

7. General method

/** * Check whether barrier broken = true */ public Boolean isBroken(){final ReentrantLock lock = this.lock; lock.lock(); try{ return generation.broken; }finally { lock.unlock(); }} // Reset barrier public void reset(){final ReentrantLock lock = this.lock; lock.lock(); try{ breakBarrier(); // break the current generation nextGeneration(); // start a new generation }finally { lock.unlock(); Public int getNumberWaiting(){final ReentrantLock lock = this.lock; lock.lock(); try{ return parties - count; }finally { lock.unlock(); }}Copy the code

8. To summarize

Cyclicbarriers mainly use ReeantrantLock and Condition to control the acquisition of thread resources. When understanding cyclicbarriers, we first need to understand ReentrantLock, Condition.

Reference:

Condition Java 8 source code analysis ReentrantLock Java multithreaded JUC package: CyclicBarrier source code learning notes