You can skip the source code if you don’t want to see it

1. The concept

AQS (AbstractQueuedSynchronizer) is used to construct a lock and synchronizer (synchronous, refers to the communication between threads, collaboration) framework, Various locks in Lock packages (such as the common ReentrantLock, ReadWriteLock), various synchronizers in concurrent packages (such as CountDownLatch, Semaphore, Cyclicbarriers are based on AQS

2. Lock principle

In concurrent programming, the two most important concepts are mutual exclusion and synchronization, and these two problems can be solved by semaphores and routines

Semaphore: P and V are operated by a shared variable S and two atoms. S can only be changed by P and V. P: requests resources, s-1, if S < 0, the thread enters the wait queue, V: releases resources, S+1, if S≤0, the queue has threads that need to be woken up

The introduction of semaphore mechanism solves the problem of process synchronization and mutual exclusion, but a large number of semaphore synchronization operations scattered in each process is not easy to manage, and may lead to system deadlock. Hence the tube process

Management process: The synchronization operations of all processes on a critical resource are gathered together to form a so-called secretary process. All threads accessing this critical point need to inform the secretary, which is composed of four parts:

  • Shared variables within a pipe.
  • Condition variables inside the pipe.
  • A process that executes in parallel within a pipe.
  • A statement that sets initial values for shared data locally and internally within a pipe.

3. Achieve synchronization and mutual exclusion

Mutually exclusive: now there is a patient (thread), for example, from the entrance into the Shared variables (the doctor), because only a doctor, so this thread, release the lock, after see another patient to enter, so essentially monitor is by sharing resources and to the operation of the Shared resources safely get and release (thread) encapsulated to ensure mutual exclusion.

Synchronous: Condition variables, namely, when the patient (threads) to see a doctor, the doctor (Shared variables) and cannot be confirmed immediately, need to have a blood test to confirm what is the situation, this time is equivalent to a blood test condition variables, and release the doctor resources at the same time, another patient, after patients blood test to check blood need to queue, waiting for the notify, Once it gets the shared variable (doctor), it can enter the entry (critical section) for processing.

4. Realization principle of AQS

It maintains a shared resource state and a FIFO wait queue (i.e., the entry wait queue of the pipe procedure above). The bottom layer uses CAS mechanism to ensure the atomicity of the operation. The specific principle is as follows: Taking the exclusive lock as an example, when a thread occupies the shared resource, the state will perform +1 operation. At this point, if other threads want to request shared resources, they will enter the FIFO queue and wait until state-1 is released. Then other threads can preempt resources. State is a shared variable of multiple threads, so it must be defined as volatile. In order to ensure the visibility of state, while volatile can guarantee visibility, but not atomicity, so AQS provides atomic manipulation of state to ensure thread safety.

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

Copy the code

AQS FIFO queue is actually bidirectional linked list implementation, from the source can be clearly seen, head node represents the current lock

Private TRANSIENT volatile Node head; Private TRANSIENT volatile Node tail; private transient volatile Node tail;Copy the code

5. Source code analysis

Acquiring a lock

Preempt the lock: As you can see, the unfair lock inherits Sync() execution lock, preempts the resource through CAS, successfully preempts state to 1, and sets setExclusiveOwnerThread to the current thread. Otherwise, if acquire is executed (acquire the exclusive lock), other threads will inevitably fail to acquire the lock change value through CAS, and locksuport.park () will be used to suspend the thread

Execute the lock() method:

— — — — — — — — — — — — — — — — — start already the lock () method — — — — — — — — — — — — — — — — — — — —

Sync – Sync can clearly see the inner class inheritance in AbstractQueuedSynchronizer (AQS)

public void lock() { sync.lock(); } //ReentrantLock private final Sync Sync; / / that is executed after the lock is an abstract method of inner class Sync lock () the abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ Void lock(); void lock(); void lock();Copy the code

— — — — — — — — — — — — — — — the following is the default not how fair lock preemption lock approach — — — — — — — — — — — — — — — –

Active locking methods in ReentrantLook:

Static final class NonfairSync extends Sync {// Final void lock() {// Static final class NonfairSync extends Sync {// Final void lock() {// So the CAS method is going to determine if your expected value is the same as the value in memory, and if it is, it's going to change it. So it's going to change the state, If (compareAndSetState(0, 1)) setExclusiveOwnerThread(thread.currentThread ()); If (acquire(1), acquire(1); if (acquire(1), acquire(1); if (acquire(1); }}Copy the code

Protected final Boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }Copy the code
// Protected final void setExclusiveOwnerThread(Thread Thread) {exclusiveOwnerThread = Thread; }Copy the code

Failure to preempt the lock through the CAS method leads to this code

public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

After entering the tryAcquire method:

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
Copy the code

Can see here didn’t perform its function implementation, there is only one throws an exception statements, users can rewrite the fairness to its lock, unfair, reentrant lock, lock non-reentrant lock, concrete in concrete ways, such as ReentrantLook on implementation, this is typical of the template design pattern, is that all subclasses must implement this method, Otherwise, throw an exception

ReentrantLook non-fair lock implementation:

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
Copy the code

—- The following is the CAS method failed to preempt the lock in tryAcquire ReentrantLook

Concrete implementation of tryAcquire:

If it is equal to 1, it will continue to determine whether the current thread is equal to itself. If it is equal to 1, the state will be +1 and the state value will be accumulated. This is the specific implementation method of reentrant lock, and the lock will be released in descending order. ReentrantLock source code can be found that the entire method of locking is integrated with AQS, that is, operations on the basis of AQS

final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); Int c = getState(); int c = getState(); // if the state value is 0,0 means that the CAS can be used to obtain the lock. // if the state value is 1, the CAS can be used to obtain the lock. // if the CAS value is 1, the CAS value is not equal. So the state must be equal to 0, and you can grab the lock successfully. For example, when you just enter the toilet door, someone is in the pit you want to go to, but when you get to the pit door, he just comes out, and you can successfully occupy the pit. if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; }} // If the lock is not equal to 0, determine whether it is the same thread to achieve the reentrant lock, the accumulated state value // at this time the lock is occupied by thread A, B, not equal, Else if (current == getExclusiveOwnerThread()) {int nexTC = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // return false; }Copy the code

Return false then in acquire method above! TryAcquire (ARG) acquireQueued(addWaiter(Node.exclusive), ARG

——- The following is how the CAS method fails to preempt the lock again and enters the wait queue ——-

TryAcquire returns false and does the following:

If the lock fails to join the queue addWaiter, the AQS queue is a bidirectional linked list with the head being head and tail being tail. If the lock fails to join the queue from the tail, the AQS will try to join the queue quickly in advance. If the lock fails, the AQS will join the queue through the CAS spin lock until it joins the queue successfully

This code starts by creating a Node bound to the current thread. Node is a bidirectional list. If the tail pointer in the wait is empty, enq(Node) is directly called to add the current thread to the end of the wait queue

Private Node addWaiter(Node mode) {private Node addWaiter(Node mode) {// create a Node that is bound to the currentThread. // Try the fast path of enq; // backup to full enq on failure // backup to full enq on failure // If it is not null, the new pointer points to the previous node, and the last pointer points to the new thread. If (pred! = null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; }} // Spinlock method enq(node); return node; }Copy the code

Enq method: For the first loop, the tail pointer is null, t is assigned by the tail pointer, and the if logic is entered. The CAS operation is used to set the head pointer, and new a Node (this Node is the underlying new Node, which can be used as a sentinel Node when the wait queue is executed for the first time). Point head to a newly created Node Node. After execution, head, tail, and t all point to the first Node element. And then we do the second loop, and t is no longer empty, else logic, and now we have the head node, and then after we do that, the precursor node of the thread node points to the sentinel node, and the rear node of the sentinel node points to the thread node

private Node enq(final Node node) { for (;;) {//t t = tail; T == null; t == null; t == null; If (t == null) {// Must initialize if (compareAndSetHead(new Node())) Tail = head; } else {// Set the thread's front pointer to the sentinel node node.prev = t; If (compareAndSetTail(t, node)) {// Put the sentinel pointer to the current thread. return t; }}}}Copy the code

AcquireQueued method:

AcquireQueued () this method determines whether the Node corresponding to the currently passed thread is head and attempts to lock it if it is. If the lock is successful, the current node will be set as head node, and then the previous head node will be vacant for subsequent garbage collection. After joining the end of queue DAIL, check whether waitStatus of the previous node is equal to SINGAL (-1). If so, the current node ends its spin, enters the blocking state, and ends its spin. If the precursor nodes are not equal, assign waitStatus of the precursor node to SINGAL. If lock failure or a forward Node is not the head Node, the Node would pass shouldParkAfterFailedAcquire methods will head Node waitStatus into SIGNAL = 1, the final execution parkAndChecknIterrupt method, Call locksupport.park () to suspend the current thread.

Final Boolean acquireQueued(final Node Node, int arg) {Boolean failed = true; try { boolean interrupted = false; for (;;) Final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // If the lock fails again, Until the thread here really to wait in the queue if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

— — — — — — — — — — — — — — — — — — trying to get in the queue lock after failure shouldParkAfterFailedAcquire method — — — — — — — — — — — — — — — — — — — — — — — —

For the current thread lead node waitStatus value, the value of the judgment waitStatus if equal to zero, will pass the CAS waitStatus value changes into SIGNAL (1), the code above shouldParkAfterFailedAcquire conditions, Execute the parkAndCheckInterrupt method later to do thread-blocking wait through locksupport. park

ShouldParkAfterFailedAcquire method:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
Copy the code

ParkAndCheckInterrupt method:

Private final Boolean parkAndCheckInterrupt() {// Suspend the thread locksupport.park (this); return Thread.interrupted(); }Copy the code

Release the lock

The tryRelease() method is implemented in ReentrantLock. If the tryRelease is successfully executed and the node with the head of the linked list is obtained, the waitStatus of the node is not equal to null and waitStatus is not equal to 0. Execute the unparksucceeded to wake up the behind node. If the value passed in by the unparksucceeded method is less than 0, that is, waitStatus is less than 0, then CAS attempts to obtain the lock, the next node is obtained, and unpark liberates the node

// State is set to 0 after reentrantLock. tryRelease(), and the Lock object's exclusive Lock is set to null. Protected Final Boolean tryRelease(int releases) {int c = getState() -releases; if (Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } // setState to 0 to implement release logic setState(c); return free; Public final Boolean release(int arg) {if (tryRelease(arg)) {Node h = head; if (h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; } // How to wake up the succeeded nodes private void unparkprecursor (Node Node) {int ws = node.waitstatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s ! = null) LockSupport.unpark(s.thread); }Copy the code

After waking up thread 2:

The for loop obtains the front node and emptying it. Then the empty node is unreachable. If it is unreachable, GC mechanism will be triggered

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code