Synchronizer AbstractQueuedSynchronizer queue (AQS)

Queue synchronizer (AQS), is used to build locks or other synchronization components of the basic framework, it through the use of int variables to represent the synchronization state, through the built-in FIFO queue to complete the queuing of resources. (From The Art of Concurrent Programming in Java)

We know that there are two modes of obtaining synchronous state: exclusive mode and shared mode. This paper first analyzes the exclusive mode.

Variable definitions

private transient volatile Node head;
Copy the code

Head Indicates the head node of the synchronization queue

private transient volatile Node tail;
Copy the code

Tail Tail node of the synchronization queue

private volatile int state;
Copy the code

State Indicates the synchronization status

Node – Synchronization queue Node definition

volatile int waitStatus;
Copy the code

WaitStatus Waiting status of a node. The value can be:

  • 0: initial state
  • -1: indicates that the node whose SIGNAL is in this state is in the wait state. If the current node releases the lock, the back node is awakened
  • -2: CONDITION This CONDITION is related to the CONDITION operation, which will be explained later
  • The PROPAGATE state is related to the shared acquisition synchronization state operation in the following description
  • 1: CANCELLED nodes will be CANCELLED and removed from the queue
volatile Node prev;
Copy the code

Prev points to the front node of the current node

volatile Node next;
Copy the code

Next points to the next node of the current node

volatile Thread thread;
Copy the code

The thread corresponding to the thread node is also the thread that failed to acquire the lock

Node nextWaiter;
Copy the code

acquire()

In exclusive mode, only one thread is allowed to obtain the synchronization status

public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

Acquire lock acquire lock acquire lock acquire lock

  • TryAcquire attempts to obtain synchronization state, how to determine whether the obtained synchronization state is implemented by the subclass
  • When the synchronization status fails to be obtained, execute addWaiter to create a Node in exclusive mode and add it to the end of the synchronization queue
  • After joining the synchronization queue, it tries to get the synchronization status again and suspends the current thread to wake up when certain conditions are reached

Let’s look at how each stage is implemented:

private Node addWaiter(Node mode) {
	// Bind the current thread to create a Node Node
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // Check whether the last node of the synchronization queue is empty
    if(pred ! =null) {
    	// The front node of a node points to the end of the queue
        node.prev = pred;
        // Move the tail of the synchronization queue to node
        if (compareAndSetTail(pred, node)) {
        	// Point the rear node of the original synchronization queue to node
            pred.next = node;
            returnnode; }}If tail is empty, the synchronization queue is not initialized
    // Call enq to initialize the queue and enqueue node
    enq(node);
    return node;
}
Copy the code
private Node enq(final Node node) {
	// Polling is implemented
	// Quit after successfully joining the team
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
        	// Create Node and point head to the Node
        	// Also point tail to the node
        	// Complete the queue initialization
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
        	// The front node of a node points to the end of the queue
            node.prev = t;
            // Move the tail of the synchronization queue to node
            if (compareAndSetTail(t, node)) {
            	// Point the rear node of the original synchronization queue to node
                t.next = node;
                returnt; }}}}Copy the code

It can be seen from the code that CAS operation is used to ensure orderly and safe node joining the queue, as shown in the figure below:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 
        for (;;) {
        	// Get the front node of the current node
            final Node p = node.predecessor();
            // Check whether the front node is the head node
            // If the front node is the head node, try to obtain the synchronization status again
            if (p == head && tryAcquire(arg)) {
            	// If the synchronization status is obtained successfully
            	// Move the head of the queue to the current node
                setHead(node);
                // Empty the next pointer of the original header node to facilitate object recycling
                p.next = null; // help GC
                failed = false;
                // Exit the polling process
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /* * This node has already set status asking a release * to signal it, so it can safely park. */
    	// If the status of the front node is -1, the rear node can be safely suspended
        return true;
    if (ws > 0) {
        /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do {
        	// ws > 0 indicates that the leading node is CANCELLED, that is, the leading node is invalid
        	// Start from the front node to find a valid front node in the direction of the queue head node
        	// This operation removes the CANCELLED node from the queue
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
        // If the front node is in the initial state, set its state to -1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
Copy the code
private final boolean parkAndCheckInterrupt(a) {
	// Suspend the current thread
    LockSupport.park(this);
    // Check if the current thread is suspended after being woken up
    return Thread.interrupted();
}
Copy the code

As you can see from the implementation of the acquireQueued, the node repeats the following process using polling (spin) after enqueueing:

  • Check whether the front node is HEAD. If the front node is HEAD, try to obtain the synchronization status. If the synchronization status is obtained successfully, move the head to the current node and exit the loop
  • If the front node is not the head node or fails to obtain the synchronization status, change the state of the front node to -1, suspend the current thread, and wait for the preceding process to be repeated

As shown below:

Let’s look at the implementation of synchronous state release.

release

Release synchronization

public final boolean release(int arg) {
	// Try to release the synchronization state
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
        	// Wake up the back node
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code
private void unparkSuccessor(Node node) {
    /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
    int ws = node.waitStatus;
    if (ws < 0)
    	// Change the state of the head node to 0
        compareAndSetWaitStatus(node, ws, 0);

    /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
    // Get the rear node
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
    	// Wake up the blocked thread on the back node
        LockSupport.unpark(s.thread);
}
Copy the code

From the above code, we can see the process of releasing synchronous state as follows:

  • TryRelease is called to try to release the synchronization state, again the implementation of which is controlled by subclasses
  • Change the state of the head node to 0 after successfully releasing the synchronization state
  • Wake up the blocked thread on the back node

As shown in the figure below (red curve represents node spin process) :

acquireInterruptibly()

Acquire synchronization state in exclusive mode, different from acquire method, which is sensitive to interrupt operation; This means that the current thread will throw an interrupt exception if it is interrupted in the process of obtaining the synchronization state

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
    	// Check whether the thread is interrupted
    	// interrupt throws an interrupt exception to be handled by the caller
        throw new InterruptedException();
    if(! tryAcquire(arg)) doAcquireInterruptibly(arg); }Copy the code
private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // Different from acquire's operation, it checks whether the operation is interrupted after wakeup, and directly throws an interrupt exception if it is interrupted
                throw newInterruptedException(); }}finally {
        if (failed)
        	CancelAcquire is finally executed after an interrupt exception is throwncancelAcquire(node); }}Copy the code
private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    // If the current node is a tail node, move the tail to the front node of the node
    if (node == tail && compareAndSetTail(node, pred)) {
    	// Also point next on the front node to null
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
        	// The current node is in the middle of the queue
            Node next = node.next;
            if(next ! =null && next.waitStatus <= 0)
            	// Point next of the front node to the back node of the node
                compareAndSetNext(pred, predNext, next);
        } else {
        	// If the front node of a node is head, wake up the back node of a node
            unparkSuccessor(node);
        }

        node.next = node; // help GC}}Copy the code

The implementation of acquireInterruptibly shows that if an interrupt occurs while the thread is obtaining the synchronization status, the synchronization queue waiting node corresponding to the current thread is removed from the queue and the thread that can obtain the synchronization status is awakened.

tryAcquireNanos()

AcquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
Copy the code
private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // Calculate the wait expiration time
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
            	// The value is returned when the timeout period expires
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                // Suspend s at the specified time
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw newInterruptedException(); }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

Node status

The node in the synchronization queue changes the state of the front-node from 0 to -1 (SIGNAL) while spinning to obtain the synchronization state, and changes the state from 0 to 1 (CANCELLED) for interruption-sensitive operations.

When the node in the synchronization queue releases the synchronization status, the status of the head node in the synchronization queue changes from -1(SIGNAL) to 0.

summary

This paper mainly analyzes the operation of obtaining synchronous state in exclusive mode, and its general process is as follows:

  • When obtaining the synchronization state, AQS maintains a synchronization queue internally, and the thread that fails to obtain the state will be added to the queue by constructing a node and perform a series of spin operations
  • When the synchronization state is released, wake up the back node of the head to get the synchronization state