preface

AQS (AbstractQueuedSynchronizer) provides a Java synchronizer JDK1.5. Util. Concurrent package (JUC) greatly improved the concurrent performance, while AQS is the core of JUC. It is the basic framework for building locks (such as ReentrantLock) and other synchronization tools (such as CountDownLatch). AQS itself is an abstract class that defines the code structure for acquiring and releasing locks, so if you want to create a lock, just inherit AQS and implement the corresponding methods.

The composition of AQS

AbstractOwnableSynchronizer AbstractQueuedSynchronizer inheritance, is to know what is the current thread lock.

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    static final class Node{... }// Synchronize queue head
    private transient volatile Node head;
    // Synchronize queue tail
    private transient volatile Node tail;
    // The status value determines whether the lock can be obtained according to the status
    private volatile int state;
    // Conditional queue
    public class ConditionObject implements Condition.java.io.Serializable {... }Copy the code

State property

AbstractQueuedSynchronizer defines a volatile member variables of type int state to represent the synchronization state, changes to the state value by CAS. For example, in CountDownLatch, state is the reciprocal number passed in the constructor, and in Semaphore, state is the number of licenses remaining. In ReentrantLock, it indicates the lock reentrant count.

Synchronous queue

FIFO Node
    static final class Node {
        Threads wait for locks in shared mode
        static final Node SHARED = new Node();
         // Threads wait for locks in exclusive mode
        static final Node EXCLUSIVE = null;
        // Indicates that the thread's request to acquire the lock has been cancelled
        static final int CANCELLED =  1;
        // indicates that the thread is ready, waiting for the resource to be released
        static final int SIGNAL    = -1;
       // The thread of the node is waiting to wake up when it is moving from the synchronous queue to the conditional queue
        static final int CONDITION = -2;
        // The current thread in the SHARED state is in the runnable state
        static final int PROPAGATE = -3;

     // Indicates the status of the current node. The behavior of the node can be controlled by the status of the node
    // The normal synchronization node is 0, and the CONDITION node is condition-2
        volatile int waitStatus;
        // The former node of the current node
        volatile Node prev;
        // The next node of the current node
        volatile Node next;
        // Thread of the current node
        volatile Thread thread;

        
        Node nextWaiter;
Copy the code

AbstractQueuedSynchronizer defines two types of resource sharing, SHARED (share) and EXCLUSIVE (EXCLUSIVE). The synchronous queue is a two-way queue, when multiple threads are requesting a lock, in exclusive lock mode. If only one thread can obtain the lock at a certain time, the remaining threads that cannot obtain the lock will block and queue up in the synchronization queue. AQS maintains a CLH queue internally to manage the lock. If the thread tries to obtain the lock, it will package the current thread and waiting state into a Node Node. Joins the end of the Sync Queue, blocks the current thread, and wakes up the head of the Queue when synchronization is released.

How to obtain a lock

acquire/acquireShared

AbstractQueuedSynchronizer acquire is realized by using template method pattern () to get locked resources.

  • Acquire () the implementation of the exclusive lock
    public final void acquire(int arg) {
        if(! tryAcquire(arg) &&// Try a tryAcquire attempt, return if successful, and acquireQueued if unsuccessful
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt(); 
    }
Copy the code
  • AcquireShared Shared lock ()
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
Copy the code
  • TryAcquire (), tryAcquireShared(),AQS does not implement this method, let subclasses implement their own

The following has been exclusive mode analysis source code example

  • If tryAcquire() fails, addWaiter is called to add the thread to the sync queue, and the current thread is placed at the end of the sync queue;
    private Node addWaiter(Node mode) {
        // Wrap the current thread as Node, mode denotes Node mode (exclusive/shared)
        Node node = new Node(Thread.currentThread(), mode);
        // CAS puts the node at the end of the queue
        Node pred = tail;
        if(pred ! =null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
Copy the code
  • After enqueueing, invoke the acquireQueued method: Block the current node so that it can acquire the lock when it is awakened;
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            // Select the last node of the current node
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                // Get lock, set to head
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                
                // Set the state of the previous node to SIGNAL
                if (shouldParkAfterFailedAcquire(p, node) &&
                  // parkAndCheckInterrupt blocks the current thread
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if (failed)
            // If the node lock fails to be obtained, the node is removed from the queuecancelAcquire(node); }}// pred is the previous node, node is the current node. Set the previous node state to SIGNAL.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // If the waitStatus status of the previous node is already SIGNAL, return it directly
    if (ws == Node.SIGNAL)
        
        return true;
    // If the current node state has been cancelled.
    if (ws > 0) {
        // Find the node whose previous state was not cancelled, because the current node is attached to the active node
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
           // Otherwise, set the node status to SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
Copy the code

How to release locks

Exclusive lock mode, starting from the head of the queue, looking for its next node, if the next node is empty, starting from the tail, to find the node is not canceled, and then release the node.

// The base method of unlock
public final boolean release(int arg) {
    // tryRelease is handed over to the implementation class, which usually subtracts the arG from the current synchronizer state. If true is returned, the lock is released successfully.
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            // Wake up the node waiting for the lock from the beginning
            unparkSuccessor(h);
        return true;
    }
    return false;
}


private void unparkSuccessor(Node node) {
    // waitStatus of the header
         int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        // If the next node is null or cancelled, the non-cancelled node at the beginning of the queue is found
        if (s == null || s.waitStatus > 0) {
            s = null;
            // find the first node in the queue whose waitStatus<0.
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        // If the next node of the current node is not empty and the state <=0, wake up the current node
        if(s ! =null)
            LockSupport.unpark(s.thread);
}
Copy the code

Implementation of AQS in CountDownLatch

CountDownLatch is a flexible locking implementation that allows one or more threads to wait for a set of events to occur. A lock consists of a counter that is initialized with a positive number indicating the number of events to wait, the countDown method decrement the counter to indicate that an event has occurred, and the await method waits for the counter to reach zero to indicate that all events that need to wait have occurred. If it is non-zero, then the await method blocks until the counter is zero or until the thread interrupts or times out.

Source code analysis

As you can see,CountDownLatch implements AQS ‘tryAcquireShared and tryReleaseShared methods to obtain and release resources in a shared manner.

  • The constructor

The count passed in when creating the CountDownLatch is the state value of the AQS

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
Copy the code
  • Await () CountDownLatch await method, it is call of AQS doAcquireSharedInterruptibly acquiring a lock
 public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
Copy the code
  • CountDown () call releaseShared (1);

CountDownLatch divides the task into count threads and execution, initializes state equal to count, and the threads execute in parallel. TryReleaseShared is called once after countDown() and the state is subtracted by 1 through the CAS algorithm. After all child threads have finished executing (i.e., state=0), unpark() the calling thread, and then the calling thread returns from the await() function to continue the residual action.