AbstractQueuedSynchronizer referred to as “AQS, AQS synchronization control core is widely used in the Java concurrent CLH queue, through the state attributes to identify the sync, CLH queue changes with the change of state control through the CAS, The underlying layer controls thread suspension and recovery by calling sun.misc.unsafe’s park(),unpark().

In AQS, state can be considered as resources. If a thread wants to acquire resources, it should first check state. If state can be acquired (depending on specific scenarios), the thread can occupy the corresponding resources (and update state through CAS, if it is exclusive, the current thread should be set). If the resource is insufficient or already occupied, the thread wraps the thread as a node and joins the queue addWaiter. At the same time, the thread goes into spin, checks if its previous node is head and tries to get the resource. If not, check whether the Node is fully suspended (mainly check node.waitStatus). If the condition is met, suspend the Node and wait until the Node in front of the queue performs release to wake up. After waking up, the Node will continue to spin. During the spin, check whether the thread is interrupet. If it is a shared lock then it needs to look at the next node after acquireShared is successfully executed. If it is, it needs to propagate setHeadAndPropagate. ReleaseShared also wakes up subsequent nodes to continue execution, if any.

  1. Two modes

    1. Shared (Shared)
      /**
         * Acquires in shared mode, ignoring interrupts.  Implemented by
         * first invoking at least once {@link #tryAcquireShared},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquireShared} until success.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquireShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         */
        public final void acquireShared(int arg) {
        // tryAcquireShared Return value >= 0 If the command is successfully obtained, run the command directly. < 0, continue to obtain permissions
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
        /**
         * Acquires in shared uninterruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireShared(int arg) {
            // The new node is merged into the team
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                // Spin execution
                for (;;) {
                    final Node p = node.predecessor();
                    // The preceding node is the first node, at which point the thread in that node is awakened to start execution
                    if (p == head) {
                        Each implementation class has a different policy and needs to be analyzed
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // Set the hand node and pass it in sequence
                            setHeadAndPropagate(node, r);
                            // Release the node
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return; }}// Determine if the thread needs to be suspended based on waitStatus, suspend the thread and check if it will be interrupted
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true; }}finally {
                if (failed)
                // Basically, the node thread is interrupted to wake up the next node to executecancelAcquire(node); }}/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- release Shared -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
    
        /**
         * Releases in shared mode.  Implemented by unblocking one or more
         * threads if {@link #tryReleaseShared} returns true.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryReleaseShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         * @return the value returned from {@link #tryReleaseShared}
         */
        public final boolean releaseShared(int arg) {
            The //try method attempts to free the resource by checking the operation state. The actual logic is different in the implementation class
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
         /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */
        private void doReleaseShared(a) {
            /* * * * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
            for (;;) {
                Node h = head;
                if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
                    // The next node needs to be notified
                    if (ws == Node.SIGNAL) {
                        // Cas replace status. If successful, no other thread is operating, notify the next node
                        if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                            continue;            // loop to recheck cases
                       // Wake up the next node
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                // If the header does not change, there are no subsequent nodes or subsequent nodes do not meet the criteria to wake up
                // Since the first thing a subsequent node does after being woken up is change the head node to the current node, the thread can help wake up more nodes
                if (h == head)                   // loop if head changed
                    break; }}Copy the code
  2. Involved in class

    
    //Semaphore
    
     // State is a semaphore. The amount of concurrency is specified at initialization.
    // At each attempt to grow resources, subtract the number of resources to be consumed until the number of available resources is 0
     final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 || compareAndSetState(available, remaining))
                        returnremaining; }}// Release the occupied semaphore
      protected final boolean tryReleaseShared(int releases) {
          for (;;) {
              int current = getState();
              int next = current + releases;
              if (next < current) // overflow
                  throw new Error("Maximum permit count exceeded");
              if (compareAndSetState(current, next))
                  return true; }}//---------------------CountDownLatch----------------------------------------------------
    
    
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0)?1 : -1;
    }
    
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0; }}Copy the code
    1. Exclusive (exclusive)
    	/**
         * Acquires in exclusive mode, ignoring interrupts.  Implemented
         * by invoking at least once {@link #tryAcquire},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquire} until success.  This method can be used
         * to implement method {@link Lock#lock}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         */
        public final void acquire(int arg) {
             // Try to occupy resources, if unsuccessful enter exclusive mode and spin
            if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// Whether to interrupt execution
                selfInterrupt();
        }
    
          /**
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         *
         * @param node the node
         * @param arg the acquire argument
         * @return {@code true} if interrupted while waiting
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                // Spin operation
                for (;;) {
                    final Node p = node.predecessor();
                    // Check whether the former node is a header and try to occupy resources
                    if (p == head && tryAcquire(arg)) {
                        // Set the header successfully
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // Check whether the thread is suspended after a fetch failure, mainly by checking Node.waitStatus
                    If the suspension condition is met, suspend the thread and check if it is interrupted after it is woken up
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true; }}finally {
                if(failed) cancelAcquire(node); }}/** * check waitStatus (SIGNAL) for the front-loaded node while waiting, and wake up the next node after its execution ** waitStatus > 0 (CANCELLED = 1) for the front-loaded node If it is in any other state, set the node in the leading state to SIGNAL, and let it remember to notify me when it finishes executing. * * Checks and updates status for a node that failed to acquire. * Returns true if thread should block the main signal * control in all acquire loops. Requires that pred == node.prev. * *@param pred node's predecessor holding status
         * @param node the node
         * @return {@code true} if thread should block
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /* * This node has already set status asking a release * to signal it, so it can safely park. */
                return true;
            if (ws > 0) {
                /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
     /** * Releases resources in exclusive mode ** Releases resources in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true.
         * This method can be used to implement method {@link Lock#unlock}.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryRelease} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @return the value returned from {@link #tryRelease}
         */
        public final boolean release(int arg) {
            // Different implementation classes have different strategies for attempting to release resources
            if (tryRelease(arg)) {
                Node h = head;
                // h.waitStatus Sets the state to 0 when the unparkprecursor is executed
                if(h ! =null&& h.waitStatus ! =0)
                    // h wakes up subsequent nodes
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
        /**
         * Wakes up node's successor, if one exists.
         *
         * @param node the node
         */
        private void unparkSuccessor(Node node) {
            /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
            /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                        s = t;
            }
            if(s ! =null)
                LockSupport.unpark(s.thread);
        }
    Copy the code
    1. Involved in class
      1. ReentrantLock reentrant, fair/unfair, timing, notify
    
    
     /** Non-fair lock attempts to acquire resources. The non-fair lock does not check whether there are waiting threads in the queue. If it is found that the state can be occupied * will directly attempt to occupy resources. Non-fair locks are relatively efficient, everyone agrees. but both need nonfair try for trylock method. */
      final boolean nonfairTryAcquire(int acquires) {
          final Thread current = Thread.currentThread();
          int c = getState();
          if (c == 0) {
              if (compareAndSetState(0, acquires)) {
                  setExclusiveOwnerThread(current);
                  return true; }}else if (current == getExclusiveOwnerThread()) {
              int nextc = c + acquires;
              if (nextc < 0) // overflow
                  throw new Error("Maximum lock count exceeded");
              setState(nextc);
              return true;
          }
          return false;
      }
     // Fair lock attempts to occupy resources
     protected final boolean tryAcquire(int acquires) {
          final Thread current = Thread.currentThread();
          int c = getState();
    
          if (c == 0) {
              If no CAS is available to update the resource status, set an exclusive thread
              if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                  setExclusiveOwnerThread(current);
                  return true; }}else if (current == getExclusiveOwnerThread()) {
              // The same thread can be re-entered
              int nextc = c + acquires;
              if (nextc < 0)
                  throw new Error("Maximum lock count exceeded");
              setState(nextc);
              return true;
          }
          return false;
      }
    
    ​
     // Reentrant locks attempt to free resources
     protected final boolean tryRelease(int releases) {
          int c = getState() - releases;
          // Check whether it is the same thread
          if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
          boolean free = false;
          if (c == 0) {
              free = true;
              setExclusiveOwnerThread(null);
          }
          // Cas updates the status
          setState(c);
          return free;
       }
    java.util.concurrent.ThreadPoolExecutor.Worker
    Copy the code
    1. CyclicBarrier CycliBarrier synchronous implementation through ReentrantLockde, Condition, mainly Condition. Await (), singlAll (), to realize its function, That is, the parties wait for each other to reach the trigger point and then trigger.

    2. Condition is to realize conditional lock. By cooperating with ReenttrantLock, the thread that needs to be suspended will encapsulate a Node Node (Node. Condition) into the Condition queue, and then release the lock resource. Suspends the current thread, acquireQueued again after it is reawakened, and clears the node that was cancelled later. The wake up operation, which calls signal or signalAll when the conditions are appropriate, will wake up the firstWaiter node (not cancelled) and join the lock queue. If signalAll is used, the firstWaiter node will be passed through and join the lock queue

    
    /** 
     * Implements interruptible condition wait.
     * <ol>
     * <li> If current thread is interrupted, throw InterruptedException.
     * <li> Save lock state returned by {@link #getState}.
     * <li> Invoke {@link #release} with saved state as argument,
     *      throwing IllegalMonitorStateException if it fails.
     * <li> Block until signalled or interrupted.
     * <li> Reacquire by invoking specialized version of
     *      {@link #acquire} with saved state as argument.
     * <li> If interrupted while blocked in step 4, throw InterruptedException.
     * </ol>
     */
    public final void await(a) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // Join the waiter queue
        Node node = addConditionWaiter();
      // Release the lock and wake up the following nodes
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // Check if the node is in the lock queue
        while(! isOnSyncQueue(node)) {/ / hung
            LockSupport.park(this);
            if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                break;
        }
        // Get the resource Lock in the Lock queue again
        if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
            unlinkCancelledWaiters();
        if(interruptMode ! =0)
            reportInterruptAfterWait(interruptMode);
    }
    ​
     private void doSignal(Node first) {
          do {
              if ( (firstWaiter = first.nextWaiter) == null)
                  lastWaiter = null;
              first.nextWaiter = null;
          } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
      }
    ​
      /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /* * If waitStatus is not Condition, the node has been cancelled. */
        if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
            return false;
    ​
        /* * Join the lock queue first after waking * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
    }
    Copy the code
    1. Read-write lock

    ReentrantReadWriteLock: state in Aqs. The low 16 bits indicate a write lock, and the high 16 bits indicate a read lock

    1. attribute

    Node -> Synchronize the Node. WaitStatus Indicates the status of the Node

    /** * Wait queue node class. * * 

    The wait queue is a variant of a "CLH" (Craig, Landin, and * Hagersten) lock queue. CLH locks are normally used for * spinlocks. We instead use them for blocking synchronizers, but * use the same basic tactic of holding some of the control * information about a thread in the predecessor of its node. A * "status" field in each node keeps track of whether a thread * should block. A node is signalled when its predecessor * releases. Each node of the queue otherwise serves as a * specific-notification-style monitor holding a single waiting * thread. The status field does NOT control whether threads are * granted locks etc though. A thread may try to acquire if it is * first in the queue. But being first does not guarantee success; * it only gives the right to contend. So the currently released * contender thread may need to rewait. * *

    To enqueue into a CLH lock, you atomically splice it in as new * tail. To dequeue, you just set the head field. *

     * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ * 

    * *

    Insertion into a CLH queue requires only a single atomic * operation on "tail", so there is a simple atomic point of * demarcation from unqueued to queued. Similarly, dequeuing * involves only updating the "head". However, it takes a bit * more work for nodes to determine who their successors are, * in part to deal with possible cancellation due to timeouts * and interrupts. * *

    The "prev" links (not used in original CLH locks), are mainly * needed to handle cancellation. If a node is cancelled, its * successor is (normally) relinked to a non-cancelled * predecessor. For explanation of similar mechanics in the case * of spin locks, see the papers by Scott and Scherer at * http://www.cs.rochester.edu/u/scott/synchronization/ * *

    We also use "next" links to implement blocking mechanics. * The thread id for each node is kept in its own node, so a * predecessor signals the next node to wake up by traversing * next link to determine which thread it is. Determination of * successor must avoid races with newly queued nodes to set * the "next" fields of their predecessors. This is solved * when necessary by checking backwards from the atomically * updated "tail" when a node's successor appears to be null. * (Or, said differently, the next-links are an optimization * so that we don't usually need a backward scan.) * *

    Cancellation introduces some conservatism to the basic * algorithms. Since we must poll for cancellation of other * nodes, we can miss noticing whether a cancelled node is * ahead or behind us. This is dealt with by always unparking * successors upon cancellation, allowing them to stabilize on * a new predecessor, unless we can identify an uncancelled * predecessor who will carry this responsibility. * *

    CLH queues need a dummy header node to get started. But * we don't create them on construction, because it would be wasted * effort if there is never contention. Instead, the node * is constructed and head and tail pointers are set upon first * contention. * *

    Threads waiting on Conditions use the same nodes, but * use an additional link. Conditions only need to link nodes * in simple (non-concurrent) linked queues because they are * only accessed when exclusively held. Upon await, a node is * inserted into a condition queue. Upon signal, the node is * transferred to the main queue. A special value of status * field is used to mark which queue a node is on. * *

    Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill * Scherer and Michael Scott, along with members of JSR-166 * expert group, for helpful ideas, discussions, and critiques * on the design of this class. */


    static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED = 1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */
    static final int PROPAGATE = -3;

    /**
    * Status field, taking on only the values:
    * SIGNAL: The successor of this node is (or will soon be)
    * blocked (via park), so the current node must
    * unpark its successor when it releases or
    * cancels. To avoid races, acquire methods must
    * first indicate they need a signal,
    * then retry the atomic acquire, and then,
    * on failure, block.
    * CANCELLED: This node is cancelled due to timeout or interrupt.
    * Nodes never leave this state. In particular,
    * a thread with cancelled node never again blocks.
    * CONDITION: This node is currently on a condition queue.
    * It will not be used as a sync queue node
    * until transferred, at which time the status
    * will be set to 0. (Use of this value here has
    * nothing to do with the other uses of the
    * field, but simplifies mechanics.)
    * PROPAGATE: A releaseShared should be propagated to other
    * nodes. This is set (for head node only) in
    * doReleaseShared to ensure propagation
    * continues, even if other operations have
    * since intervened.
    * 0: None of the above
    *
    * The values are arranged numerically to simplify use.
    * Non-negative values mean that a node doesn't need to
    * signal. So, most code doesn't need to check for particular
    * values, just for sign.
    *
    * The field is initialized to 0 for normal sync nodes, and
    * CONDITION for condition nodes. It is modified using CAS
    * (or when possible, unconditional volatile writes).
    */

    volatile int waitStatus;

    /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */
    volatile Node prev;

    /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */
    volatile Node next;

    /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */
    volatile Thread thread;

    /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */
    Node nextWaiter;

    /** * Returns true if node is waiting in shared mode. */
    final boolean isShared(a) {
    return nextWaiter == SHARED;
    }

    /**
    * Returns previous node, or throws NullPointerException if null.
    * Use when predecessor cannot be null. The null check could
    * be elided, but is present to help the VM.
    *
    * @return the predecessor of this node
    */

    final Node predecessor(a) throws NullPointerException {
    Node p = prev;
    if (p == null)
    throw new NullPointerException();
    else
    returnp; }}Copy the code