This is the 26th day of my participation in the August More Text Challenge

Phase to recommend

  • Java Basics
  • Java concurrent programming

🎈CountDownLatch

1.1 an overview of the

CountDownLatch is a synchronization helper class that allows one or more threads to wait until an operation performed by another thread completes.

CountDownLatch is implemented through a counter whose initial value is the number of threads. CountDown () {count (-1) = 0; await () {count (-1) = 0; await () {await (); await () {await ();

1.2 Inner class of class

CountDownLatch class is an inner class Sync, inherited from AbstractQueuedSynchronizer, its source code is as follows.

private static final class Sync extends AbstractQueuedSynchronizer {
    / / version number
    private static final long serialVersionUID = 4982264981922014374L;
    
    / / the constructor
    Sync(int count) {
        setState(count);
    }
    
    // Returns the current count
    int getCount() {
        return getState();
    }

    // Attempts to get object state in shared mode
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0)?1 : -1;
    }

    // Attempts to set the state to reflect a release in shared mode
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        // Infinite loop
        for (;;) {
            // Get the status
            int c = getState();
            if (c == 0) // Not owned by thread
                return false;
            // Next state
            int nextc = c-1;
            if (compareAndSetState(c, nextc)) // Compare and set successfully
                return nextc == 0; }}} Description: Yes`CountDownLatch`Method calls are forwarded to calls to Sync or AQS methods, so AQS supports CountDownLatch.Copy the code

Constructor of class 1.3

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // Initialize the number of states
    this.sync = newSync(count); } This constructor can construct a class initialized with the given count`CountDownLatch`In the constructor, sync is initialized and the number of states is set.Copy the code

1.4 CountDownLatch’s two core functions

1.4.1 countDown

  • Decrement the count of the latch, and if the count reaches zero, all waiting threads are released.
  • Decrement if the current count is greater than zero. If the new count is zero, all waiting threads are re-enabled for thread scheduling purposes.
  • If the current count is zero, nothing happens.
public void countDown() {
    sync.releaseShared(1); } Description: Yes`countDown`Is converted to the Sync object`releaseShared`Method (inherited from AQS).Copy the code
  • releaseSharedThe source code is as follows
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false; } This function frees objects in shared mode and is called within the function`CountDownLatch`the`tryReleaseShared`Function, and may call AQS`doReleaseShared`Function.Copy the code
  • tryReleaseSharedThe source code is as follows
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    // Infinite loop
    for (;;) {
        // Get the status
        int c = getState();
        if (c == 0) // Not owned by thread
            return false;
        // Next state
        int nextc = c-1;
        if (compareAndSetState(c, nextc)) // Compare and set successfully
            return nextc == 0; }} Description: This function attempts to set the state to reflect a release in shared mode.Copy the code
  • The AQSdoReleaseSharedThe source code of
private void doReleaseShared() {
    /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
    // Infinite loop
    for (;;) {
        // Save the header
        Node h = head;
        if(h ! =null&& h ! = tail) {// The header is not null and the header is not the tail
            // Get the wait state of the header
            int ws = h.waitStatus; 
            if (ws == Node.SIGNAL) { // The status is SIGNAL
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0)) // If you fail, continue
                    continue;            // loop to recheck cases
                // Release the successor node
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // The status is 0 and unsuccessful, continue
                continue;                // loop on failed CAS
        }
        if (h == head) // If the header changes, continue the loop
            break; }} Description: This function frees resources in shared mode.Copy the code

CountDownLatch’s countDown call chain:

1.4.2 await

  • Causes the current thread to wait until the latch countdown reaches zero, unless the thread is interrupted.
  • If the current count is zero, this method returns immediately. That is, the thread blocked by the await method is woken up and continues execution.
  • If the current count is greater than zero, the current thread is disabled and put to sleep for thread scheduling purposes.
public void await() throws InterruptedException {
    // Forward to sync object
    sync.acquireSharedInterruptibly(1); } Description: Yes`CountDownLatch`The object'sawaitThe call to Sync is forwarded`acquireSharedInterruptibly`(methods inherited from AQS) method calls.Copy the code
  • acquireSharedInterruptiblyThe source code is as follows:
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } :`acquireSharedInterruptibly`Call again`CountDownLatch`Inner class Sync`tryAcquireShared`And AQS`doAcquireSharedInterruptibly`Function.Copy the code
  • tryAcquireSharedThe source of the function is as follows:
protected int tryAcquireShared(int acquires) {
    return (getState() == 0)?1 : -1; } This function simply checks whether the state of AQS is0for0It returns1, not for0The return -1.Copy the code
  • doAcquireSharedInterruptiblyThe source of the function is as follows:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // Add a node to the wait queue
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) { // Infinite loop
            // Get the precursor node of node
            final Node p = node.predecessor();
            if (p == head) { // The precursor node is the header
                // Attempts to get object state in shared mode
                int r = tryAcquireShared(arg);
                if (r >= 0) { // Succeeded
                    // Set the header and propagate
                    setHeadAndPropagate(node, r);
                    // Set the next domain for the node
                    p.next = null; // help GC
                    failed = false;
                    return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // Whether thread should be disabled and interrupt checked after fetching failure
                // Throw an exception
                throw newInterruptedException(); }}finally {
        if(failed) cancelAcquire(node); }} Description: in AQS`doAcquireSharedInterruptibly`May be called again in`CountDownLatch`Inner class Sync`tryAcquireShared`Methods and AQS`setHeadAndPropagate`Methods.Copy the code
  • setHeadAndPropagateThe method source code is as follows:
private void setHeadAndPropagate(Node node, int propagate) {
    // Get the header
    Node h = head; // Record old head for check below
    // Set the header
    setHead(node);
    /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */
    // Make a judgment
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        // Get the successor of the node
        Node s = node.next;
        if (s == null || s.isShared()) // The successor is empty or in shared mode
            // Release in shared modedoReleaseShared(); }} This method sets the header node and releases the node that meets the condition after the header node`doReleaseShared`methodsCopy the code
  • doReleaseSharedThe method source code is as follows:
private void doReleaseShared() {
    /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
    // Infinite loop
    for (;;) {
        // Save the header
        Node h = head;
        if(h ! =null&& h ! = tail) {// The header is not null and the header is not the tail
            // Get the wait state of the header
            int ws = h.waitStatus; 
            if (ws == Node.SIGNAL) { // The status is SIGNAL
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0)) // If you fail, continue
                    continue;            // loop to recheck cases
                // Release the successor node
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // The status is 0 and unsuccessful, continue
                continue;                // loop on failed CAS
        }
        if (h == head) // If the header changes, continue the loop
            break; }} Note: This method is released in shared mode.Copy the code

Await call chain for CountDownLatch:

1.5 CountDownLatch sample

import java.util.concurrent.CountDownLatch;

class MyThread extends Thread {
    private CountDownLatch countDownLatch;
    
    public MyThread(String name, CountDownLatch countDownLatch) {
        super(name);
        this.countDownLatch = countDownLatch;
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName() + " doing something");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " finish");
        countDownLatch.countDown();
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        MyThread t1 = new MyThread("t1", countDownLatch);
        MyThread t2 = new MyThread("t2", countDownLatch);
        t1.start();
        t2.start();
        System.out.println("Waiting for t1 thread and t2 thread to finish");
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }            
        System.out.println(Thread.currentThread().getName() + " continue"); }} Waitingfor t1 thread and t2 thread to finish
t1 doing something
t2 doing something
t1 finish
t2 finish
main continue
Copy the code

🎀CyclicBarrier (addition counter)

2.1 an overview of the

A CyclicBarrier, also known as a circular barrier, is a barrier that can be recycled. It enables a group of threads to wait until a certain state and then all execute simultaneously. Each thread will block itself by calling the await() method when it reaches the fence, at which point the counter will be reduced by 1, and at which point all threads blocked by calling the await method will be woken up. Cyclicbarriers are called loops because they can be reused (by calling the CyclicBarrier’s reset() method) after all the waiting threads have been released.

2.2 CyclicBarrierMain methods:

public class CyclicBarrier {

    private int dowait(boolean timed, long nanos); // For the await method call to determine if the condition is met to proceed
    
    // Create a new CyclicBarrier that will be triggered when a given number of participants (threads) wait, incrementing each time the CyclicBarrier is executed to parties, triggering the execution of the barrierAction
    public CyclicBarrier(int parties, Runnable barrierAction) ;
    
    // Create a new CyclicBarrier that will trigger when a given number of participants (threads) wait. Each execution of the CyclicBarrier will increment the number of obstacles by one. If the target barrier is reached, The statement after cyclicBarrier. Await () is executed
    public CyclicBarrier(int parties) 
        
	// Returns the number of participants required to trigger this barrier.
    public int getParties()
	
    // Wait until all parties have called await on this barrier.
	// If the current thread is not the last to arrive, it is disabled and dormant for thread scheduling purposes. Until all threads are called or interrupted or an abnormal interrupt exits
    public int await(a)A timeout exception will be thrown if all threads have not completed during the waiting time
    public int await(long timeout, TimeUnit unit)

    // Reset the barrier to its initial state.
    public void reset()

}
Copy the code

2.3 Constructors

  • CyclicBarrier(int, Runnable)Type constructor
public CyclicBarrier(int parties, Runnable barrierAction) {
    If the number of threads involved is less than or equal to zero, an exception is raised
    if (parties <= 0) throw new IllegalArgumentException();
    / / set the parties
    this.parties = parties;
    / / set the count
    this.count = parties;
    / / set barrierCommand
    this.barrierCommand = barrierAction; } This constructor can specify the association of the`CyclicBarrier`And you can specify the execution action to be performed by the last thread to perform the barrier after all threads have entered the barrier.Copy the code
  • CyclicBarrier(int)Type constructor
public CyclicBarrier(int parties) {
    // Call a constructor with two arguments
    this(parties, null); } This constructor only executes the number of threads associated with the CyclicBarrier and does not set the action to execute.Copy the code

2.4 CyclicBarrier two core functions

Against 2.4.1 dowait

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    // Save the current lock
    final ReentrantLock lock = this.lock;
    / / lock
    lock.lock();
    try {
        // Save the current generation
        final Generation g = generation;
        
        if (g.broken) // Barrier is broken, an exception is thrown
            throw new BrokenBarrierException();

        if (Thread.interrupted()) { // The thread is interrupted
            // Damage the current barrier and wake up all threads, only called if they have a lock
            breakBarrier();
            // Throw an exception
            throw new InterruptedException();
        }
        
        // Reduce the number of threads waiting to enter the barrier
        int index = --count;
        if (index == 0) {  // The number of threads waiting to enter the barrier is 0, and all threads have entered
            // Run the action identifier
            boolean ranAction = false;
            try {
                // Save the run action
                final Runnable command = barrierCommand;
                if(command ! =null) // The action is not empty
                    / / run
                    command.run();
                // Set ranAction status
                ranAction = true;
                // Enter the next generation
                nextGeneration();
                return 0;
            } finally {
                if(! ranAction)// There is no running action
                    // Damage the current barrierbreakBarrier(); }}// loop until tripped, broken, interrupted, or timed out
        // Infinite loop
        for (;;) {
            try {
                if(! timed)// Wait time is not set
                    / / wait for
                    trip.await(); 
                else if (nanos > 0L) // The wait time is set and the wait time is greater than 0
                    // Wait for the specified duration
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) { 
                if (g == generation && ! g.broken) { // is equal to the current generation and the barrier is not damaged
                    // Damage the current barrier
                    breakBarrier();
                    // Throw an exception
                    throw ie;
                } else { // Does not mean that the barrier is damaged with the latter
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    // Interrupt the current threadThread.currentThread().interrupt(); }}if (g.broken) // The barrier is damaged, an exception is thrown
                throw new BrokenBarrierException();

            if(g ! = generation)// does not equal the current generation
                // return index
                return index;

            if (timed && nanos <= 0L) { // The wait time is set and the wait time is less than 0
                // Damage the barrier
                breakBarrier();
                // Throw an exception
                throw newTimeoutException(); }}}finally {
        / / releases the locklock.unlock(); }}Copy the code

Logical flow of dowait method:

2.4.2 nextGeneration

This function is called after all threads have entered the barrier, that is, the next version is generated and all threads can re-enter the barrier. The source code is as follows:

private void nextGeneration() {
    // signal completion of last generation
    // Wake up all threads
    trip.signalAll();
    // set up next generation
    // Restore the number of threads waiting to enter the barrier
    count = parties;
    // The new generation
    generation = new Generation();
}
Copy the code

In this function, the signalAll method of AQS is called, which wakes up all waiting threads. If all threads are waiting for this condition, all threads are woken up. The source code is as follows:

public final void signalAll() {
    if(! isHeldExclusively())// Not exclusive by the current thread, throws an exception
        throw new IllegalMonitorStateException();
    // Save the condition queue header
    Node first = firstWaiter;
    if(first ! =null) // The header is not null
        // Wake up all waiting threadsdoSignalAll(first); } This function checks if the header is empty, i.e. if the conditional queue is empty, and then calls`doSignalAll`Function.Copy the code
  • doSignalAllThe function source code is as follows:
private void doSignalAll(Node first) {
    // condition sets the head and tail of the queue to null
    lastWaiter = firstWaiter = null;
    / / loop
    do {
        // Get the nextWaiter domain of first
        Node next = first.nextWaiter;
        // Set first's nextWaiter field to null
        first.nextWaiter = null;
        // Move the first node from the condition queue to sync queue
        transferForSignal(first);
        // Reset first
        first = next;
    } while(first ! =null); } This function, which in turn moves nodes from the conditional queue to the synchronous queue, is called to`transferForSignal`Function.Copy the code
  • transferForSignalThe function source code is as follows:
final boolean transferForSignal(Node node) {
    /* * If cannot change waitStatus, the node has been cancelled. */
    if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
        return false;

    /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true; } This function transfers a conditional queue to a synchronous queue and sets the status information of the node, which will be called`enq`Function.Copy the code
  • enqThe function source code is as follows:
private Node enq(final Node node) {
    for (;;) { // Loop indefinitely to make sure the node is successfully enqueued
        // Save the end node
        Node t = tail;
        if (t == null) { // The tail is empty, that is, it has not been initialized
            if (compareAndSetHead(new Node())) // Leave the header empty and set the header to the newly generated node
                tail = head; // Both the header and the tail point to the same new node
        } else { // The endnode is not empty, that is, it has been initialized
            // Connect the prev domain of node to the tail node
            node.prev = t; 
            if (compareAndSetTail(t, node)) { // Compares whether t is the tail, and if so, sets the tail to node
                // Set the next field of the endnode to node
                t.next = node; 
                return t; // return the end node}}}} Description: This function completes the process of inserting nodes into the synchronization queueCopy the code

NewGeneration function call chain:

2.5 CyclicBarrier sample

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("Collect seven cards and enter the lucky draw.");
        });

        for (int i = 1; i <= 7; i++) {
            final int temp=i;
            new Thread(()->{
                try {
                    System.out.println(Thread.currentThread().getName()+"Collect to the first"+temp+"Zhang");
                    // Block the task thread
                    cyclicBarrier.await();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch(BrokenBarrierException e) { e.printStackTrace(); } }).start(); }}} program output: Thread-1Collected in the first2A Thread -4Collected in the first5A Thread -3Collected in the first4A Thread -2Collected in the first3A Thread -0Collected in the first1A Thread -6Collected in the first7A Thread -5Collected in the first6Collect seven cards and enter the drawCopy the code

3. 🩰Semaphore

3.1 an overview of the

Semaphore: A Semaphore is usually used to limit the number of threads that can access certain (physical or logical) resources.

Usage Scenarios:

Restrict resources, such as location competition and traffic limiting.

3.2 Common Methods of Semaphore

  • Void acquire() license number -1, which acquires licenses from the semaphore and blocks until available or the thread is interrupted.

  • Void acquire(int permits) permits – Permits, block until available or thread is interrupted.

  • Int availablePermits() returns the number of permits currently available in this semaphore.

  • Void release() permits +1 to release the license and return it to the semaphore.

  • Permitting void release(int permits) Permits +permits, returns a given number of permits to the semaphore.

  • Boolean hasQueuedThreads() queries whether there are threads waiting to obtain permissions.

  • Int getQueueLength() returns an estimate of the number of threads waiting for permission.

3.3 Semaphore constructor

  • Semaphore(int)Type constructor
public Semaphore(int permits) {
    sync = newNonfairSync(permits); } This constructor creates a fair setting with a given number of permits and an unfair setting`Semaphore`.Copy the code
  • Semaphore(int, boolean)Type constructor
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : newNonfairSync(permits); } This constructor creates an object with a given number of permissions and a given fair setting`Semaphore`.Copy the code

3.4 Semaphore’s two core functions

3.4.1 track acquire

This method takes one (or more) license from the semaphore and blocks or interrupts the thread until a license is provided. The source code is as follows:

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1); } This method will call the Sync object`acquireSharedInterruptibly`A method inherited from AQS`acquireSharedInterruptibly`Methods in the`CountDownLatch`The analysis has been carried out in, and is no longer redundant here.Copy the code

3.4.2 release

This method releases one (or more) permission and returns it to the semaphore, with the following source code:

public void release() {
    sync.releaseShared(1); } This method will call the Sync object`releaseShared`A method inherited from AQS`releaseShared`Methods in the`CountDownLatch`The analysis has been carried out in, and is no longer redundant here.Copy the code

3.5 Semaphore sample

/ * * *@Author: Akiang
 * @Date: 2021-08-26 09:03
 * version 1.0
 */
public class SemaphoreDemo1 {
    public static void main(String[] args) {
        // 10 PCS
        Semaphore semaphore = new Semaphore(10);

        // 20 friends want to surf the Internet
        for (int i = 1; i <= 20; i++) {
            new Thread(() -> {
                try {
                    // Wait to obtain the license
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "Got the computer.");
                    // Grab a friend, quickly start the fight here is a mock time ha,
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // After playing a few games, my girlfriend is looking for someone to accompany us
                    System.out.println("My girlfriend came looking for me,"+Thread.currentThread().getName() + "Left.");
                    semaphore.release();// Free up resources and leave the computer to someone else.}},String.valueOf(i)).start(); }}}Copy the code

Four, simple 🏏 about | Phaser & Exchanger

4.1 Phaser

Phaser is a reusable synchronization barrier similar in function to CyclicBarrier and CountDownLatch, but more flexible in use. Great for synchronizing and coordinating staged computation tasks in multi-threaded environments (Phaser is preferred when sub-tasks in Fork/Join frameworks need to synchronize)

  • List of functions:
// constructor
public Phaser() {
    this(null.0);
}
public Phaser(int parties) {
    this(null, parties);
}
public Phaser(Phaser parent) {
    this(parent, 0);
}
public Phaser(Phaser parent, int parties)
// Register a new party
public int register()
// Batch registration
public int bulkRegister(int parties)
// Causes the current thread to reach phaser without waiting for other tasks to arrive. Return Arrival Phase Number
public int arrive() 
// Make the current thread reach phaser and unregister
public int arriveAndDeregister()
/* * Causes the current thread to arrive at the phaser and wait for other tasks to arrive, equivalent to awaitAdvance(arrive()). * If you need to wait for interrupts or timeouts, a similar construct can be done using the awaitAdvance method. * If you need to cancel registration upon arrival, you can use awaitAdvance(arriveAndDeregister()). * /
public int arriveAndAwaitAdvance()
// Wait for a given phase number, return the next arrival Phase number
public int awaitAdvance(int phase)
// Block waiting until the phase advances to the next generation, and return the next generation's phase number
public int awaitAdvance(int phase) 
// Respond to interrupt version awaitAdvance
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException
// Puts the current Phaser into the terminated state, leaving registered parties unaffected and terminating all phasers if hierarchical
public void forceTermination()

Copy the code
  • Example:
public class PhaserDemo {

    private static Phaser phaser = new MyPhaser();

    // Customize a phase shifter to customize the output
    static class MyPhaser extends Phaser {
        / * * *@deprecated Overwrite methods that perform operations and control termination when an upcoming phase is advanced. This method is called when the party advancing the phase shifter arrives (while all other waiting parties are dormant). * If this method returns true, the phase shifter will be set to the final terminating state in advance and will return true for isTerminated subsequent calls. *@param Phase enters the current phase number of the method, before the phase shifter advances *@param RegisteredParties number of currently registeredParties *@return* /
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            if (phase == 0) {
                System.out.println("Everyone has arrived at the Internet cafe and is ready to start hacking!!");
                return false;
            } else if (phase == 1) {
                System.out.println("We all agree to have a barbecue!!");
                return false;
            } else if (phase == 2) {
                System.out.println("Everyone back to dormitory!!");
                return true;
            }
            return true; }}// Build a thread task
    static class DoSomeThing implements Runnable {
        @Override
        public void run() {
            /** * Adds a new unreached side */ to this phase shifter
            phaser.register();
            System.out.println(Thread.currentThread().getName() + "From home, ready to go to school after the street open black!!");
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + "I'm hungry. Want to have a barbecue?");
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + "Barbecue is over.");
            phaser.arriveAndAwaitAdvance();
        }
    }

    public static void main(String[] args) throws Exception {
        DoSomeThing thing = new DoSomeThing();
        new Thread(thing, "Xiao Ming").start();
        new Thread(thing, "Wang").start();
        new Thread(thing, "Xiao li").start(); }}/** * Xiao Li set out from home, ready to go to school after the street open black!! * Xiao Wang set out from home, ready to go to school after the street open black!! * Xiao Ming set out from home, ready to go to school after the street dark!! * Everyone has arrived at the Internet cafe and is ready to start hacking!! * Xiao Li is hungry. Do you want to have a barbecue? * Xiao Ming is hungry. Do you want to have a barbecue? * Xiao Wang is hungry. Do you want to have a barbecue? * Everyone agrees, let's go to a barbecue!! * Xiaoming barbecue time over * Xiaoli barbecue time over * Xiaowang barbecue time over * everyone back to dormitory together!! * /
Copy the code

4.2 Exchanger

Non-recovery Allows two threads to exchange objects at a junction, which is useful in some pipe designs.

Sanodomain provides a synchronization point at which a pair of threads can exchange data. Each thread provides data to its partner thread through an entry to the Exchange () method, and receives data from its partner thread and returns it.

When two threads exchange objects, the exchange is safe for both threads. Sanodomain is considered a bidirectional SynchronousQueue and is useful for genetic algorithms and pipeline design applications.

  • Recovery implementation mechanism
for (;;) {
    if (slot is empty) { // offer
        // Set item to Node when slot is empty
        place item in a Node;
        if (can CAS slot from empty to node) {
            // When nodes are swapped into slots via CAS, the suspended thread waits to be woken up
            wait for release;
            // Returns the matched item in node after being awakened
            return matching item innode; }}else if (can CAS slot from node to empty) { // release
         // Set slot to null
        // Get the item in node and set the data to be swapped to the matching item
        get the item in node;
        set matching item in node;
        // Wake up the waiting thread
        release waiting thread;
    }
    // else retry on CAS failure
}
Copy the code
  • Exchanger sample

Here’s a classic concurrency problem: you have the same buffer, one or more data producers, and one or more data consumers. It’s just that the Exchange class can only synchronize 2 threads, so you can only use this class if you have only one producer and one consumer in your producer and consumer problem.

static class Producer extends Thread { private Exchanger<Integer> exchanger; private static int data = 0; Producer(String name, Exchanger<Integer> exchanger) { super("Producer-" + name); this.exchanger = exchanger; } @Override public void run() { for (int i=1; i<5; i++) { try { TimeUnit.SECONDS.sleep(1); data = i; System.out.println(getName()+" before :" + data); data = exchanger.exchange(data); System.out.println(getName()+" after :" + data); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Consumer extends Thread { private Exchanger<Integer> exchanger; private static int data = 0; Consumer(String name, Exchanger<Integer> exchanger) { super("Consumer-" + name); this.exchanger = exchanger; } @Override public void run() { while (true) { data = 0; System.out.println(getName()+" before :" + data); try { TimeUnit.SECONDS.sleep(1); data = exchanger.exchange(data); } catch (InterruptedException e) { e.printStackTrace(); } system.out.println (getName()+" after :" + data); } } } public static void main(String[] args) throws InterruptedException { Exchanger<Integer> exchanger = new Exchanger<Integer>(); new Producer("", exchanger).start(); new Consumer("", exchanger).start(); TimeUnit.SECONDS.sleep(7); System.exit(-1); }} The result could be as follows: Consumer-before the exchange :0 producer-before the exchange :1 consumer-after the exchange :1 consumer-before the exchange :0 producer-after the exchange :0 producer-before the exchange :2 producer-after the exchange :0 Consumer- After exchange :2 Consumer- before exchange :0 Producer- before exchange :3 Producer- after exchange :0 Consumer- after exchange :3 Consumer- before exchange :0 Producer- before exchange :4 Producer- After exchange :0 Consumer- After exchange :4 Consumer- before exchange :0Copy the code

The difference between CyclicBarrier and CountDownLatch

  • The await() method of CountDownLatch blocks the main thread or the thread calling await(), while the await() method of CyclicBarrier blocks the task thread and the main or calling thread is not affected.

  • CountDownLatch cannot reset count times, and CyclicBarrier can be reused using the reset() method

  • CountDownLatch and CyclicBarrier are both used for multithreaded synchronization, with CountDownLatch based on AQS and CyclicBarrier based on ReentrantLock and Condition.

Reference: The Art of Concurrent Programming in Java

JUC series (7) | JUC three commonly used tools CountDownLatch, CyclicBarrier, Semaphore