preface

AQS full name: AbstractQueuedSynchronizer, abstract queue synchronizer, and the synchronized, it is to use Java code to achieve a synchronizer, developers can enhance and expand based on its functions.

AQS is half of the J.U.C package, and many concurrent utility classes are implemented using AQS, such as ReentrantLock, Semaphore, CountDownLatch, and so on.

Each Java lock object has a corresponding Monitor object (object Monitor), which is implemented by C++. The object monitor maintains a variable Owner, which refers to the thread currently holding the lock, and an Entry List collection, which holds threads that failed to compete for locks. Threads in this collection are suspended and dormant until the Owner thread releases the lock, and then the JVM wakes up the thread in the Entry List collection to continue competing for the lock, and so on.

AQS’s job is to use Java code to do what synchronized does in C code. Java developers aren’t necessarily familiar with C, and it’s not easy to read the source implementation of Synchronized. Fortunately, the JDK provides AQS, and reading AQS source code will give you a deeper understanding of concurrency.

The core idea of AQS is that a volatile int property, state, represents the synchronized state, such as 0 for unlocked and 1 for locked. When multiple threads compete for resources, change the state by CAS. For example, change from 0 to 1. The thread that is modified successfully is the thread that has successfully competed for resources. When the exclusiveOwnerThread thread releases resources, it wakes up the thread from the queue to continue working, and the loop repeats. Is the logic similar to the synchronized low-level? Isn’t it.

Theory of about the same, this article through already AbstractQueuedSynchronizer, by reading the source code, take a look at how AQS work worship: by the way, Doug Lea bosses.


AQS infrastructure

Before reading the source code, let’s briefly understand the ARCHITECTURE of AQS:The architecture is relatively simple, except for implementing the Serializable interface, which only inheritsAbstractOwnableSynchronizerThe parent class.AbstractOwnableSynchronizerOne is maintained in the parent classexclusiveOwnerThreadProperty is used to record the thread that is currently exclusive to the synchronizer resource, and nothing else.

AQS has an internal class Node. AQS will encapsulate the thread that fails to compete for lock into a Node Node. The Node class has prev and next attributes, pointing to the precursor Node and the successor Node respectively, forming a bidirectional linked list structure. In addition, each Node also has a volatile int waitStatus, which represents the wait state of the Node and has the following values:

  1. 0: default value for creating a node.
  2. SIGNAL(-1) : indicates that a successor node is waiting for the current node to wake up.
  3. CONDITION(-2) : indicates that the node is waiting on the CONDITION. When another thread calls the CONDITION’s signal() method, the node in the CONDITION state is moved from the wait queue to the synchronization queue and waits to acquire the synchronization lock.
  4. PROPAGATE(-3) : In shared mode the first node wakes up all the subsequent nodes.
  5. CANCELLED(1) : CANCELLED the competitive resource. The lock will be triggered only after timeout or interruption.

As can be seen, the critical value of waitStatus is 0. If the waitStatus is greater than 0, the node is invalid. For example, when AQS wakes up the node in the queue, the node whose waitStatus is greater than 0 will be skipped.

AQS also internally maintains an int state variable that represents the state of the synchronizer. For example, in ReentrantLock, state is the number of times the lock is reentered. For each lock, state is +1, and for each unlock, state is -1. When state equals 0, the lock is not locked.

AQS also maintains the head and tail properties internally to point to the first and last nodes in the FIFO queue. The node referred to by the head is always the worker thread. Threads do not leave the queue once they have acquired a lock. Only when the head releases the lock and wakes up the subsequent node and sets it to head will the queue be dequeued.


What does reentrantLock.lock () do?

Example program: open three threads: A, B, C, in order to call lock() method, what happens??

1. There is no thread contention lock at first. Thread A calls the lock() method: It’s actually handed over to the Sync object, which is a class that inherits from AQS.

ReentrantLock uses an unfair lock by default. No matter whether there are waiting threads in the queue or not, it tries to grab the lock using CAS directly. If the lock is successful, it sets the current thread to exclusiveOwnerThread and returns. If this fails, acquire(1) is called to acquire the lock.

// If a thread is waiting in the queue for a lock, it will grab the lock.
final void lock(a) {
	// If the CAS mode changes the state, no other thread holds the lock, and the current thread is set as the owner of the exclusive lock
	if (compareAndSetState(0.1))
		setExclusiveOwnerThread(Thread.currentThread());
	else
		// If the CAS fails, another thread already holds the lock
		acquire(1);
}
Copy the code

Fair locking is very polite. It asks if there are any threads waiting in the queue, and if so, it lets them get it first and waits.

final void lock(a) {
	acquire(1);
}

// Fair lock - acquire the lock
protected final boolean tryAcquire(int acquires) {
	final Thread current = Thread.currentThread();
	int c = getState();
	if (c == 0) {
		/* Determine if there are threads in the queue that are already waiting, even if the queue is currently unlocked. If another thread is waiting, let the other thread acquire the lock first, and then join the queue and hang. If there are no threads in the queue, a CAS contention is attempted. * /
		if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true; }}// The current thread is the thread holding the lock
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0)// Overflows due to too many reentrants
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}
Copy the code

Thread A acquires the lock and returns the lock, regardless of whether the lock is fair or not.

Suppose thread A has not released the lock, thread B calls lock(), and the lock race fails, then acquire(1) is called, which is the template method for AQS.

TryAcquire (): try to acquire the lock again. 2. AddWaiter (): If not, add a Node to the tail of the queue. 3. AcquireQueued (): Go to queue. * /
public final void acquire(int arg) {
	if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// Whether a self-interrupt is required to make up for the interruption that occurred while the thread was waiting.
		selfInterrupt();
}
Copy the code

In acquire(), tryAcquire() is first called to try to acquire the lock. If not, addWaiter() encapsulates the current thread as a Node and acquireQueued() is called to queue. It is important to note that AQS does not respond to interrupts while queuing. If an interrupt occurs during queuing, AQS can only automatically fill in a self-interrupt :selfInterrupt().

The process for trying to acquire a lock is as follows:

// Non-fair lock attempts to acquire the lock
final boolean nonfairTryAcquire(int acquires) {
	final Thread current = Thread.currentThread();
	int c = getState();
	if (c == 0) {
		// State ==0, indicating that other threads have released the lock. Change the state again in CAS mode. If the state is successfully changed, the lock is grabbed.
		if (compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true; }}// if another thread has not released the lock, check whether the thread holding the lock is the current thread. If so, reentrant, state++.
	State indicates the number of times the lock has been reentered. 0 indicates that the lock has been released.
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0) // If the number of reentrants exceeds the maximum int value, an exception will be thrown.
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	// If another thread does not release the lock and the current thread is not the thread holding the lock, the lock fails.
	return false;
}
Copy the code

Since thread A did not release the lock and thread B is not the lock holder, tryAcquire() returns false.

If the attempt to acquire the lock fails, the Node Node is created and merged into the queue. AddWaiter code is as follows:

// If the attempt to acquire the lock fails, join the queue.
private Node addWaiter(Node mode) {
	// Create a Node bound to the current thread
	Node node = new Node(Thread.currentThread(), mode);
	Node pred = tail;
	if(pred ! =null) {
		/* If tail is not null, the CAS uses tail to point to the current Node. If it fails, enq() is called to retry. 1. The prev of the current node points to the tail of the previous node. 2. The next of the former tail points to the current node. These three steps are not atomic. If the time segment of step 2 expires, the thread holding the lock releases the lock to wake up the node, and if the thread looks for the node from head to tail, the next of the former tail node will still be null. The assignment to prev is executed before CAS, so when the queue is woken up, it is no problem to search from tail to head. * /
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			returnnode; }}// If CAS fails to join the queue, spin retry
	enq(node);
	return node;
}
Copy the code

If tail is not null, the prev of the current node points to the current tail, tail points to the current node through CAS, and next of the previous tail points to the current node. Here’s one thing to note:

  1. The prev of the current node points to the previous tail
  2. The CAS points tail to the current node
  3. The next of the previous tail points to the current node

These three steps are not atomic. If the time segment of step 2 expires and the thread holding the lock releases the lock to wake up the node, if the thread looks for the node from head to tail, the next of the previous tail node is still null, there will be a missed wake problem. The assignment to prev is executed before CAS, so when the queue is woken up, it is no problem to search from tail to head.

If CAS fails to join the queue, it doesn’t matter, enq() will be called to spin retry until successful:

// CAS failed to join the team, spin retry
private Node enq(final Node node) {
	for (;;) { // How is this better than while(true)??
		Node t = tail;
		if (t == null) {
			// tail==null, indicating that the queue is empty, perform initialization.
			// Create a new node with head and tail pointing to it. Tail is no longer null as we enter the loop.
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
			/* The queue is not empty. 1. The prev of the current node points to the tail of the previous node. Next of the previous tail points to the current node and tries again until it is successful */
			node.prev = t;
			if (compareAndSetTail(t, node)) {
				t.next = node;
				returnt; }}}}Copy the code

If tail is null, the queue has not been initialized and a Node is created with head and tail pointing to the empty Node. When we loop again, because we’ve already initialized, we go to the else logic, and we do the same three steps.

After thread B joins the queue, the internal structure of AQS is as follows:

After node B joins the queue successfully, the queuing operation begins. Should thread B continue to compete or Park hang?

// The Node joins the queue
final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;// Failed to acquire the lock
	try {
		/* The thread does not respond to interrupts while it is waiting. If an interrupt occurs, it must wait until the thread grabs the lock to interrupt itself :selfInterrupt(). * /
		boolean interrupted = false;// Whether an interruption occurred during lock acquisition.
		for (;;) {
			final Node p = node.predecessor();// Get the precursors of the current node
			/* If you have a head in front of you, you are eligible to grab the lock. Head releases the lock and wakes up the current node. * /
			if (p == head && tryAcquire(arg)) {
				/* If the lock is successfully snatched, head releases the lock and wakes up the current node. If head is pointed to the current node, failed = false indicates that the lock is successfully obtained. * /
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			// If the contention lock fails, determine whether the current thread needs to be suspended
			if (shouldParkAfterFailedAcquire(p, node) &&
					parkAndCheckInterrupt())
				interrupted = true; }}finally {
		// If the lock is not acquired, the thread will be suspended and the loop will continue indefinitely, so as long as nothing happens, the lock will be acquired eventually.
		// If the lock is not acquired, the thread breaks out of the loop, indicating that the thread does not want to compete. For example, the lock times out.
		// At this point, you need to modify
		if(failed) cancelAcquire(node); }}Copy the code

If the previous node of the current node is head, the current node has the qualification of competing locks. There are two cases:

  1. Join the queue as the first node.
  2. Head releases the lock and wakes up the current node.

Instead of thread B being awakened by thread A, in the first case, thread B will try to acquire the lock again, but it will fail because thread A has not released it. Thread B after failed to get the lock, will perform shouldParkAfterFailedAcquire (), determine whether should be Park hung.

// Thread contention lock failure whether to suspend
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	int ws = pred.waitStatus;
	if (ws == Node.SIGNAL)
		// If the waitStatus of the precursor node is equal to -1, the current node is safe to suspend.
		return true;
	if (ws > 0) {
		// The cut-off point of waitStatus is 0. 0 is the default value. If the value is less than 0, the node is valid. If the value is greater than 0, the node is invalid.
		// If the previous node is invalid, the search continues until it finds a valid node and points its next to itself. Invalid nodes in the middle are collected by GC.
		do {
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		// Change waitStatus of the precursor node to -1 in CAS mode, indicating that the current node is waiting to be awakened by the precursor node.
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}
Copy the code

The prerequisite for a thread to be suspended by Park is that it must be a precursor nodewaitStatusSet toSIGNAL(-1), so that when the precursor node releases the lock, it wakes up the successor node.

Because at this point the head nodewaitStatus= 0, does not satisfy the condition, so thread B will try to change it toSIGNALThis time thread B will not be Park. At this point, the internal structure of AQS is:

Cycle again, because the thread A haven’t release the lock, thread B failed to get the lock on this, again shouldParkAfterFailedAcquire (), the precursor of node waitStatus is SIGNAL (1), so the thread B can ease Park, return true.

ShouldParkAfterFailedAcquire () returns true representatives need to thread B hangs, therefore performs parkAndCheckInterrupt () :

// Suspends the current thread and waits to be awakened by the precursor
private final boolean parkAndCheckInterrupt(a) {
	LockSupport.park(this);
	// The blocking process does not respond to interrupts. If an interrupt occurs during the blocking process, the self-interrupt is added :selfInterrupt().
	return Thread.interrupted();
}
Copy the code

The suspended procedure is simple, calling the locksupport.park () method. As mentioned earlier, the AQS queuing process does not respond to interrupts. If an interrupt occurs, it can only wait for the thread to wake up and fill in its own interrupt, so an interrupt flag is returned for the thread.

Thread B is now suspended by Park and can only continue until thread A wakes up.

AcquireQueued () ¶ acquireQueued() ¶ acquireQueued() ¶ acquireQueued() ¶ acquireQueued() ¶ acquireQueued() ¶ acquireQueued() ¶ acquireQueued() ¶ acquireQueued() ¶ acquireQueued() ¶ acquireQueued() ¶ acquireQueued() ¶ acquireQueued() ¶ acquireQueued();

// Unlocks contention on the node
private void cancelAcquire(Node node) {
	// Ignore nonexistent nodes
	if (node == null)
		return;

	node.thread = null;// Unbind the thread

	// Go ahead and skip the CANCELLED node
	Node pred = node.prev;
	while (pred.waitStatus > 0)
		node.prev = pred = pred.prev;

	Node predNext = pred.next;

	// Set the current node to 'CANCELLED', so that if the node fails to exit the queue, it does not matter. When waking up the node, it will skip the CANCELLED node
	node.waitStatus = Node.CANCELLED;

	if (node == tail && compareAndSetTail(node, pred)) {
		/* If node is the tail, use CAS to point the tail to the previous node, and the current node directly dequeued. Exit successfully, set next of tail to empty. * /
		compareAndSetNext(pred, predNext, null);
	} else {
		/* If the previous operation fails, there are two cases: 1. The current node was originally a tail, and a new node was inserted during the cancellation process, which is now not a tail. 2. The current node used to be an intermediate node. If the current node is an intermediate node, you need to do two things: 1. Change the waitStatus of the precursor node to SIGNAL so that it wakes up the subsequent node after releasing the lock. 2. Point next of the predecessor node to the successor node, and the current node is removed from the list. The CANCELLED process is allowed to fail. Even if the process is not CANCELLED, as long as the waitStatus of the node is set to CANCELLED, the head will skip the CANCELLED node when waking up the successor node. The process of changing the precursor node to SIGNAL is also allowed to fail. If it fails, the node's successor node will be woken up to change the precursor node to SIGNAL. * /
		int ws;
		if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
			/* Change the "SIGNAL" value to "SIGNAL". Change the "SIGNAL" value to "SIGNAL". Change the "SIGNAL" value to "SIGNAL". * /
			Node next = node.next;
			if(next ! =null && next.waitStatus <= 0)
				compareAndSetNext(pred, predNext, next);
		} else {
			/* If the prev of the current node is head, then the second node is eligible to compete and wake up the next node. 2. If the precursor node fails to be changed to SIGNAL, wake up the successor node and let it change the status of the precursor node. * /
			unparkSuccessor(node);
		}

		node.next = node; // help GC}}Copy the code

There are two ways to consider when a node cancellations a lock:

  1. The current node is tail.
  2. The current node is an intermediate node.

In the first case, we simply point tail to the prev of the current node, set next to null, and the current node is dequeued. The second case is more complicated. If the current node’s precursor is not head, then you must change the precursor’swaitStatusSet toSIGNAL(-1)Next points to the next of the current node, and the prev of the next of the current node points to the precursor node.If the current node’s predecessor is head, then it wakes up the current node’s successor because the third is now the second and it is eligible to compete for the lock.

If CAS fails to change the pointing of the node, it doesn’t matter. It wakes up the successor node of the current node and asks it to change the waitStatus of the predecessor node, so that the current node can leave the queue.

Obviously, thread B does not trigger cancelAcquire().

Let’s say that thread A still doesn’t existunlockIn this case, thread C must also acquire the lock. Obviously thread C will fail, AQS will encapsulate it as a Node and queue it as a Node for thread BwaitStatusInstead ofSIGNAL(-1)Park then sleeps. At this point, the internal structure of AQS is:


What does reentrantLock.unlock () do?

After threads B and C successfully join the queue and Park, assume thread A executes at this pointunlockRelease the lock:To release the lock, sync is actually calledrelease(), which is also the AQS template approach:

/ / releases the lock
public final boolean release(int arg) {
	/* Call the subclass tryRelease(), returning true to indicate successful release. For ReentrantLock, reducing state to 0 indicates that the lock needs to be released. * /
	if (tryRelease(arg)) {
		/* head represents the node holding the lock. If head's waitStatus! =0, indicating that a successor node is waiting to be awakened. Remember when a thread enqueues and has to change the waitStatus of its precursor to -1 if it wants to suspend?? If a node enqueues without changing the waitStatus of the precursor node, it will not be awakened. * /
		Node h = head;
		if(h ! =null&& h.waitStatus ! =0)
			// After releasing the lock, wake up the subsequent node
			unparkSuccessor(h);
		return true;
	}
	return false;
}
Copy the code

AQS will call the subclass’s tryRelease() implementation, and when it returns true to indicate that the resource has been successfully freed, AQS will wake up the node in the queue.

/* Attempts to release the lock. Returns true to indicate that the lock was successfully released. Only the thread holding the lock can release it, so there is no concurrency problem. * /
protected final boolean tryRelease(int releases) {
	int c = getState() - releases;
	// The thread that is not holding the lock releases the lock, throwing an exception.
	if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
	boolean free = false;
	if (c == 0) {
		// Release the lock when state==0
		free = true;
		setExclusiveOwnerThread(null);
	}
	setState(c);
	return free;
}
Copy the code

ReentrantLock is a reentrable lock, state is the number of reentrants of the lock, and tryRelease() is the state decrement process. When state drops to 0, the lock is released and exclusiveOwnerThread is set to null.

TryRelease () must be called by the thread holding the lock, otherwise an exception will be thrown.

Thread A is finishedtryRelease()At this point, the internal structure of AQS is:

TryRelease () returns true, AQS is going to wake up the nodes in the queue, execute unparksucceeded () :

// Wake up the subsequent node
private void unparkSuccessor(Node node) {
	// Set waitStatus to 0
	int ws = node.waitStatus;
	if (ws < 0)
		compareAndSetWaitStatus(node, ws, 0);

	/* If the waitStatus of the successor node is >0, the node will be skipped because it is CANCELLED. Then start from the tail to the head until you find a valid node with waitStatus <= 0 and wake it up. Why look at the tail? When a node joins the queue, you need to perform the following operations: 1. The prev of the current node points to the tail of the previous node. 2. The next of the preceding tail points to the current node. If the time slice expires at Step 2, the next of the precursor node is still null, and there will be a problem of missed wake up. Prev assigns before CAS, so you can always find it by looking ahead with prev. * /
	Node s = node.next;
	if (s == null || s.waitStatus > 0) {
		s = null;
		for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
				s = t;
	}
	if(s ! =null)
		LockSupport.unpark(s.thread);
}
Copy the code

An interesting operation here is that when the next node is invalid, AQS skips it and finds a valid node again. Why does AQS start with tail and go to head instead of head??

When a node joins the queue, you need to perform the following operations: 1. The prev of the current node points to the previous tail; 2. The CAS points tail to the current node; 3. The next of the previous tail points to the current node. If the time slice expires at Step 2, the next of the precursor node is still null at this time, and there will be the problem of missed wake up. Prev assigns before CAS, so you can always find it by looking ahead with prev.

When the node is awakened, it continues execution from the AQS parkAndCheckInterrupt() method, reacquiring the lock.

Thread A releases the lock and wakes thread B up, and thread B continues to compete.Thread B succeeds and points the head to the current node. At this point, the internal structure of AQS is:After running for some time, thread B also releases the lock, and thread C is awakened.Thread C will successfully acquire the lock:When thread C releases the lock, the last Node is not out of the column, but is retained. The next time a thread tries to compete for a lock, the previous head will be overwritten.


The problem

Finally, let me summarize some important questions about AQS.

1. When is the worker thread unqueued?

When a node in the FIFO queue competes for a resource, it does not immediately leave the queue, but instead points its head to itself. After releasing the lock, the node does not leave the team voluntarily. Instead, it waits for the next node to successfully compete for the lock and changes the pointing of the head to kick out the previous head.

2. What are the rules for AQS wake up queues?

If the waitStatus of the current node is equal to 0, the subsequent node will not be woken up. This is why the premise of the new node is to change the waitStatus of the previous node to SIGNAL(-1) before it is queued to sleep. If not, change the waitStatus of the previous node to SIGNAL(-1). Subsequent nodes will not be awakened, resulting in a deadlock.

AQS first wakes up next, the immediate successor to the current node. If next is null, there are two cases:

  1. There really is no successor node, and the previous node to which next refers may have dropped out of contention due to timeout or other reasons.
  2. There is a successor node, but because of multithreading, the successor node has not had time to change the current nodenextPointing to it.

The first case is easy, the successor node is null, do not wake up. In the second case, you need to go from tail to head and wake up when you find a valid node.

AQS will also choose to skip a direct successor node if its waitStatus is greater than 0. As mentioned earlier, a node whose waitStatus is greater than 0 represents an invalid node; for example, CANCELLED(1) is a node that has been uncontested. If the immediate successor is an invalid node, AQS iterates from tail to head until it finds a valid node and wakes it up.

Conclusion: If there is a direct successor node and the node is valid, the successor node is first awakened. Otherwise, we iterate from tail to head until we find a valid node.

3. Why should the wake node start with the tail?

This is because, when a new node is enqueued, three steps need to be performed:

  1. The prev of the current node points to the previous tail
  2. The CAS points tail to the current node
  3. The next of the previous tail points to the current node

The three operations AQS are not synchronized. If the CPU slice expires after step 2, the node will point to something like this:The precursor node has not yet been assigned to next, so if you start from the beginning to the end, there may be a problem of missed wake. The assignment of prev precedes the CAS operation of tail, so you can avoid this problem by looking from tail to head.

4. Other questions

Occupy a pit, the problem can continue to update, thought of more. If you have any doubts, please comment and let me know. I will record what I know, and we can discuss it together.


The tail

I am still very impressed. At the beginning of the year, I was asked about AQS in an interview. At that time, I did not calm down to study, and some concepts were still very vague.

I worked overtime last week, so I can write an article to relax myself. So I decided to challenge my weakness AQS, calm down to read the source code to find that the past to see the blog on the Internet, has been more vague concept, in fact, the code has been written very clear.

In the past to read the source code, will slowly read to understand the future!!


Articles you may be interested in:

  • It’s a showdown. I’m gonna hand write an RPC
  • The Java lock bloat process and the effect of consistency hashing on lock bloat
  • ThreadLocal source parsing
  • CMS and three-color labeling algorithm
  • Accessibility analysis algorithm for vernacular understanding