An overview of the

In the last we have discussed the implementation of the lock, then AbstractQueuedSynchronizer (hereinafter referred to as “AQS”) and the realization of the front have different?

  1. When the thread cannot acquire the lock, AQS uses spin and blocking;
  2. To support cancellation and timeout operations, AQS has improved the CLH lock queue by adding explicit links to predecessor nodes. If the immediate predecessor node cancels or times out, the immediate predecessor is found.
  3. Since releasing locks requires notifying successor nodes, AQS adds successor node links for optimization (not necessary).

function

A synchronizer typically contains two operations:

  1. Obtain operation: acquire

Block the calling thread until the synchronization state allows it to continue.

while(Failed to get synchronization status) {if the current thread is not enqueued, it is enqueued; Block the current thread; } If the current thread is in the queue, remove itCopy the code
  1. Release operation: release

Alter the synchronization state in such a way that one or more of acquire’s blocked threads continue to execute.

Updating synchronization Statusif(Synchronization state allows a blocking thread to fetch) {wake up threads in one or more queues}Copy the code

So we can start from these two interfaces to interpret the source code. In addition, the implementation of locks can be divided into exclusive locks and shared locks. For the sake of simplicity, we will focus on the code implementation of exclusive locks and then look at the differences of shared locks.

The source code interpretation

acquire

public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
  1. Attempts to obtain the lock. If the lock is successfully obtained, the system returns directly without performing subsequent operations.
  2. Create a node that represents the current thread and join the queue.
  3. Spin forward node state and blocking thread
tryAcquire

This method checks whether the synchronization state, state, is permitted, or, in plain English, whether a lock can be obtained. The implementation in AQS only throws exceptions, so locks based on AQS implementations need to implement this method.

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
Copy the code
Node

When the synchronization state is not permitted, it needs to queue in the wait queue, so it needs to create a node representing the thread to queue. Let’s take a look at the definition of the node (minus some of the attributes that we don’t care about so far).

static final class Node {
    static final Node EXCLUSIVE = null;

    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    
    volatile int waitStatus;
    volatile Thread thread;
}
Copy the code
  • EXCLUSIVE: indicates that the node type is an exclusive lock.
  • waitStatus: Describe the state, for now we are only concernedCANCELLED(Cancel wait due to timeout or thread interruption) andSIGNAL(Indicates that if the node releases the lock, it needs to notify its successor that it is waiting to wake up.) Two states.
  • thread: The thread corresponding to the node.
addWaiter

The next step is to add nodes to the wait queue. The whole idea is to insert nodes at the end of the queue.

When joining the team, we need to consider two situations: the tail of the team is empty and the tail of the team is not empty. However, in the realization of AQS, it is considered that the tail of the team is not empty in most cases. Therefore, we try to join the team quickly according to the way that the tail of the team is not empty.

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if(pred ! = null) { node.prev = pred;if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
Copy the code
enq

Initialize the head and tail nodes of the queue:

To insert a new node at the end of the queue, the addWaiter path to quickly insert a new node is this logic:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}Copy the code
acquireQueued

Once the node is enqueued, the next thing to do is constantly check to see if the state is available. The idea here is to check whether the predecessor node is the head node (because only after the head node releases the lock, the successor node can acquire the lock), and then check the status; If the successor is not the head node, change the status of the successor node waitStatus = SIGNAL (indicating that the successor is waiting to wake up), and then block the thread.

If the successor of the head node successfully acquired the lock, the head node can exit the queue:

  1. Modify the pointing of the head node to the new node (the successor of the original head node);
  2. The precursor of the new head nodeprevSet tonull(The successor of the new header node is the original header node)

To help GC reclaim the original header, set the successor of the original header to NULL.

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if(failed) cancelAcquire(node); }}Copy the code

The next step is to release the lock. From the process of node joining the queue, the lock release requires not only modifying the synchronization status, but also waking up the successor node.

release

The whole implementation mainly involves the following three things:

  • Modifying synchronization Status
  • Check whether any successor nodes need to be woken up
  • Wake up the successor node
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! = null && h.waitStatus ! = 0) unparkSuccessor(h);return true;
    }
    return false;
}
Copy the code
unparkSuccessor

UnparkSuccessor (h) :

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! = null && t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! = null) LockSupport.unpark(s.thread); }Copy the code