preface

Java concurrent packages below a Semaphore, already and CountDownLatch multi-threaded auxiliary class, while their inner classes are inherited an abstract class – AbstractQueuedSynchronizer, referred to as “AQS. AQS can be used to build locks or synchronizers, and an abstract class can implement all kinds of synchronizers, so the mystery is sure to be explored.

The body of the

AQS

AQS is composed of a bidirectional linked list, and uses an int type as state. Different implementation classes have different understanding methods for this state, so it depends on the implementation class to rewrite the abstract method to define the understanding method of state.

CompareAndSetState Atomic modification method

Each Node holds the thread instance and state. If the AQS implementation class is a lock, then each Node represents the thread that wants to acquire the lock.

static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;

	volatile int waitStatus;
	Node nextWaiter;
	volatile Node prev;
	volatile Node next;
	volatile Thread thread;

	final Node predecessor(a) throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
Copy the code

Prev and Next represent the precursor and successor nodes, thread represents the thread at queue time, and waitStatus and nextWaiter are of interest.

waitStatus

Represents the state of the current node, which has the following states:

  • Block state (0), which is undefined, actually indicates that node is in a queue waiting to acquire a lock
  • The CANCELLED state (1), defined as CANCELLED, indicates that the node/thread has been CANCELLED
  • SIGNAL status (-1), defined as SIGNAL, indicates that the successor nodes of the current node need to be notified to run
  • CONDITION state (-2), defined as CONDITION, indicates that the current node is in the CONDITION queue
  • Propagation state (-3), defined as PROPAGATE, means that the subsequent acquireShared can be executed in the current scene

According to the above definition, greater than 0 can be represented as a cancellation and less than 0 as a special event

nextWaiter

NextWaiter forms a single linked list, which can be expressed as a CONDITION queue; It can also be the special value SHARED, indicating that the current node is in a SHARED mode. If empty, EXCLUSIVE is EXCLUSIVE mode.

nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

 private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
Copy the code

Changes to linked list nodes also require atomicity. Here, AQS uses unsafe to calculate the offset of Next, and compareAndSwapObject to change the next value of the linked list

API

AQS provides apis for getting and releasing state, consisting of the following methods

methods describe
acquire(int) Get status in exclusive mode
acquireInterruptibly(int) Gets state in exclusive mode, throws an exception if the current thread is interrupted
acquireShared(int) Obtain the status in shared mode
acquireSharedInterruptibly(int) Gets state in shared mode, throws an exception if the current thread is interrupted
release(int) Release status
acquireShared(int) Release status in shared mode

Acquire and release are equivalent to acquiring and releasing locks. AQS doesn’t care how we acquire locks, but hands them off to subclasses:

  • TryAcquire /Share: Check whether it can be obtained in the current state. It can be divided into Share and exclusive modes, which are implemented by subclasses respectively.
  • TryRelease /Share: indicates whether the current state allows release
  • IsHeldExclusively: Indicates whether the state is occupied in exclusive mode

Through the above API can form a synchronizer, the function of the synchronizer is implemented by the subclass. If locks need to be implemented, then Lcok and UnLcok correspond to acquire and release. We only need to implement tryAcquire and tryRelease to determine whether locks can be acquired and the release logic.


Let’s look at the implementation:

  • Acquire and release
public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

AcquireQueued is a spin state operation. If the condition is not met and the spin fails, the thread must be interrupted to avoid consuming I0.

// Add a Waiter to the current queue
private Node addWaiter(Node mode) {
	// Save thread information
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if(pred ! =null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

// queue operation
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    returnt; }}}}Copy the code

When called acquireQueued, addWaiter is called to add the current thread to the blocking queue as a node, enQ is called to add the new node to the queue as tail, head is an empty node. Using the atomic method, if multiple threads join the queue at the same time only one thread can join the queue at the same time, and the remaining threads continue to join the queue.

++ Start spin lock after joining the team ++

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}Copy the code

Failed and interrupted are used to indicate success and whether or not interrupted. Each loop determines whether the prev node is head and whether it has a condition to fetch. When getting into shouldParkAfterFailedAcquire and parkAndCheckInterrupt after failure

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
Copy the code

Returns a Boolean indicating whether the thread should be blocked

  • If its prev is signal state, it is safe to block, indicating that the previous thread has set the state and will emit a signal when released.
  • If its PREv nodes are cancelled, they need to be removed from the queue and reconnected.
  • If its prev node state is 0, it is given a signal state
private final boolean parkAndCheckInterrupt(a) {
        LockSupport.park(this);
        return Thread.interrupted();
    }
Copy the code

The unsafe park method is used here, which can be understood as wait and notify for threads.

private void cancelAcquire(Node node) {
     
        if (node == null)
            return;

        node.thread = null;

        Node pred = node.prev;
	// Disconnect the canceled node
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        Node predNext = pred.next;

        node.waitStatus = Node.CANCELLED;

     	// Remove itself if it is the tail node
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
      
            int ws;
            if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
                Node next = node.next;
                if(next ! =null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC}}Copy the code

When the spin fails, you need to delete this node, and there are some subtleties to deleting this node. Because if there are other nodes below, the next node needs the previous node to be in SIGNAL state to notify it, disconnecting itself in the process and setting the state of the previous node to SIGNAL. If we find this node and the point is not head, and the next node is valid, then we can connect the two nodes. If not, we need to wake up the next node and let it spin.

private void unparkSuccessor(Node node) {
 
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
	// If the next node is empty or cancelled, continue to find one that matches the criteria
        if (s == null || s.waitStatus > 0) {
            s = null;
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        if(s ! =null)
            LockSupport.unpark(s.thread);
    }
Copy the code

The unparkantecedent is chiefly to notify the unpark of the next node, or the search for it from behind if the next is an invalid node. Because it’s a FIFO queue, you start at the end.

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
Copy the code

The release is easy, plus the tryRelease of the subclass’s implementation, will wake up the next node through the unparkprecursor.


  • AcquireShared and releaseShared
public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
Copy the code

Shared mode only enters spin when acquireShared<0, because shared mode is a state shared by multiple threads and blocks when this state is not reached.

private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

DoAcquireShared and acquireQueued are the same to some extent. The differences are:

	if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return; }}Copy the code

The main entry is sethead propagate

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null|| s.isShared()) doReleaseShared(); }}Copy the code

If the next node is also shared, doReleaseShared is called

for (;;) {
            Node h = head;
            if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
Copy the code

This method requires that the state of the current head be set to PROPAGATE, notify subsequent nodes if the current state is SIGNAL, and also spin when the head is changed by another thread.

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
Copy the code

ReleaseShared is very simple, and doReleaseShared is called when tryReleaseShared is successfully triggered.

conclusion

Through the analysis of acquire and release, we can know that our customized logic is carried out on the Try start method, while AQS is processed differently according to the different exclusive and shared modes, but in essence, they are all operations on FIFO queues. The advantage of using queues to manage threads is that they can be kept in order and can be woken up for one thread rather than notifyAll.

ConditionObject

One entity class within AQS is ConditionObject, which is an extension of AQS. When using locks written by AQS, multiple condition variables can be generated, and condition variables can block and wake up threads.

ConditionObject internal have like AQS queue queue called CONDITION, use CONDITION to CONDITION of representation:

 private transient Node firstWaiter;
 private transient Node lastWaiter;
Copy the code

ConditionObject has three methods: await, signal and signalAll wait, notify and notifyAll.

  • await
public final void await(a) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while(! isOnSyncQueue(node)) { LockSupport.park(this);
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
            if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
                unlinkCancelledWaiters();
            if(interruptMode ! =0)
                reportInterruptAfterWait(interruptMode);
        }
Copy the code

First call addConditionWaiter to create a new node to add to the conditional queue

private Node addConditionWaiter(a) {
            Node t = lastWaiter;
            if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node =new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

private void unlinkCancelledWaiters(a) {
            Node t = firstWaiter;
            Node trail = null;
            while(t ! =null) {
                Node next = t.nextWaiter;
                if(t.waitStatus ! = Node.CONDITION) { t.nextWaiter =null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                elsetrail = t; t = next; }}Copy the code

First, remove the non-condition node from the CONDITION queue, and then add a new CONDITION node to the end of the queue

int savedState = fullyRelease(node);

 final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw newIllegalMonitorStateException(); }}finally {
            if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code

Save the current state to release the current state. If the current thread does not acquire state through Acquire, an exception is thrown.

The lock is released because the current condition is not met, allowing another thread to acquire the lock execution logic

while(! isOnSyncQueue(node)) { LockSupport.park(this);
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
Copy the code

When the lock is released, the thread is added to the conditional queue, isOnSyncQueue is to determine whether it is in the synchronous blocking queue, that is, the AQS queue; When another thread executes signal, the conditional node is placed on the synchronous blocking queue. If the node is not on the synchronous blocking queue, signal has not been called.

Instead of looking at the following code, take a look at sign ++

public final void signal(a) {
            if(! isHeldExclusively())throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if(first ! =null)
                doSignal(first);
        }
Copy the code

Condition variables are exclusive, so subclass isHeldExclusively or throw an exception. The real logic is DoSignal

private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
        }
Copy the code

The inside of the while is to migrate the first node of the conditional queue to the synchronous queue and, if unsuccessful, to the next node

final boolean transferForSignal(Node node) {
        
        // Cas failure indicates that the other thread has completed the transfer and returns to continue the transfer to the next node
        if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
            return false;
        
        // Add the node to the synchronous blocking queue
        Node p = enq(node);
        int ws = p.waitStatus;
        
        // Unpark wakes the thread true if the node is canceled. Let the thread spin
        if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
    }
Copy the code

At this point, the node has been placed in the synchronization blocking queue, and the thread in the synchronization queue may be woken up by another thread. Take a look at this code:

  while(! isOnSyncQueue(node)) { LockSupport.park(this);
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
Copy the code

After the thread to be awakened, while condition is not established, but also to carry out the method checkInterruptWhileWaiting, check whether be interrupted

private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
Copy the code

If broken, to determine whether to REINTERRUPT (exit await the reset interrupt) or THROW_IE (await exit when an exception is thrown), specific see transferAfterCancelledWait logic, without interruption is 0.

final boolean transferAfterCancelledWait(Node node) {
        // If the wake up is caused by a synchronous queue, node status is no longer CONDITION, indicating that the interrupt occurred before signal
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        
        Waitstatus is not CONDITION, which is an interrupt that occurs after the signal has already occurred
        // The signal method moves the node to the blocking queue, but it may not be done yet, and spins to wait for it to complete
        while(! isOnSyncQueue(node)) Thread.yield();return false;
}
Copy the code

Because this thread is not necessarily woken up in a synchronous queue, it may also be woken up by a false wake, node cancellation, or failure to modify SIGNAL.

Now assume that signal succeeded and the node was successfully added to the synchronous blocking queue ++

 if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
                unlinkCancelledWaiters();
            if(interruptMode ! =0)
                reportInterruptAfterWait(interruptMode);
Copy the code

Passing previously saved state as an argument to acquireQueued is the same as calling Acquire directly

public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

Acquire has a judgment of selfInterrupt, and await does the same

 private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }
Copy the code

conclusion

Both AQS and ConditionObject use queues to save their status, which can be a bit convoluted. The TryAcquire method is used to define whether the state can be obtained. If the state is not obtained, the spin blocks. When a thread releases the state, the next node will be awakened, thus realizing the synchronization of multiple nodes.