Introduction to the

Before this introduction is already, in the ReentrantReadWriteLock sync attribute, and the sync is inherited AQS AbstractQueuedSynchronizer synchronizer. AQS adopts the template design pattern, calls its template methods (exclusive get and release synchronization state, shared get and release synchronization state and query waiting threads in the synchronization queue), overrides the specified method, we can use AQS to construct customized synchronization components.

AQS parsing

Important attributes

Private TRANSIENT volatile Node head; Private TRANSIENT volatile Node tail; private transient volatile Node tail; // Synchronization status private volatile int state;Copy the code

AQS internally defines a FIFO queue through head and tail. State indicates the synchronization state (0 indicates that no thread has acquired the synchronization state or lock, and greater than 0 indicates that there is thread possession). Both are volatile to ensure the visibility of memory

Important inner class

Static final class Node {/** SHARED mode */ static final Node SHARED = new Node(); /** EXCLUSIVE mode */ static final Node EXCLUSIVE = null; /** Because the thread waiting in the synchronization queue timed out or was interrupted, it needs to cancel the wait from the synchronization queue.The node does not change after entering this state*/ static final int CANCELLED = 1; Static final int SIGNAL = -1; static final int SIGNAL = -1; static final int SIGNAL = -1; /** * The node is in a wait queue, and the node thread is waiting on the Condtion. When another thread calls signal() on the Condtion, the node willMove from the wait queue to the synchronous queueStatic final int CONDITION = -1; static final int CONDITION = -1; PROPAGATE = PROPAGATE; PROPAGATE = PROPAGATE; PROPAGATE = PROPAGATE; /** current node waitStatus */ volatile int waitStatus; /** Precursors */ volatile Node prev; /** Next Node */ volatile Node next; /** The Thread associated with the node */ volatile Thread Thread; /** * This field is a SHARED constant if the current Node is SHARED, i.e. the Node type (exclusive and SHARED) * and the successor Node in the wait queue share the same field */ Node nextWaiter; }Copy the code

Node node is the basis of the synchronization queue. Pre and Next maintain a bidirectional queue successively. The structure of the synchronization queue is shown as follows:

When one thread succeeds in acquiring the synchronization state (or lock), other threads cannot acquire the synchronization state and are instead constructed as nodes and added to the synchronization queue

Exclusive get and release synchronization state

  • Exclusive access to the synchronization status
  • If a thread fails to obtain the synchronization status and enters the synchronization queue, the thread will not be removed from the synchronization queue when it is interrupted

    public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code


    TryAcquire () is called to ensure that the thread can safely acquire the synchronization state (or lock). This method is implemented by its own synchronizer. If it fails, a synchronization node is constructed and added to the end of the synchronization queue by calling addWaiter(). Finally, call acquireQueued() in an infinite loop to get the synchronization status ④. If you do not get, call shouldParkAfterFailedAcquire () method to determine whether need to block, if return true congestion nodes of threads, precursor node can rely on a team or block the thread is interrupted to awaken the blocked thread

    tryAcquire

    Only exceptions are thrown inside the tryAcquire() body. This method must be overridden if the custom synchronizer is to get synchronization state exclusively

    
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    Copy the code


    addWaiter

    The node is added to the synchronization queue

    Private Node addWaiter(Node mode) {// Create a new Node Node Node = new Node(thread.currentThread (), mode); // CAS quickly tries to insert the tail Node. if (pred ! = null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; }} // Multiple attempts to enq(node); return node; }Copy the code

    If the queue is empty or the CAS setting fails, enQ spin is set again

    Private Node enq(final Node Node) {// Loop for (;;) {// get Node t = tail; If (t == null) {// Must initialize // cas set head if (compareAndSetHead(new Node())) // set tail = head; } else {// CAS set node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code

    It can be found from the source code that if the synchronization queue fails to add nodes, there will be an endless loop until it is successfully added

    acquireQueued

    After the node enters the synchronization queue, it enters a spin process. When the condition is met and the synchronization state is obtained, it can exit from the spin process, otherwise it will continue to execute

    final boolean acquireQueued(final Node node, int arg) { boolean failed = true; // Interrupt token Boolean interrupted = false; // For (;;) Final Node p = node.predecessor(); If (p == head && tryAcquire(arg)) {setHead(node); p.next = null; // help GC failed = false; return interrupted; } / / to determine whether a thread to block the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

    It can be found from the source code that only the precursor node of the current node is the head node can attempt to obtain the synchronization state, for the following reasons: ①. The head node is the node that has successfully obtained the synchronization status. After releasing the synchronization status, the head node wakes up its successor nodes. After waking up, the successor node needs to check whether it is the head node (2). Keep the FIFO synchronous queue principle

    blocking

    After joining the queue, the thread spins continuously to obtain synchronization status, but during the spinning process, it needs to determine whether the current thread needs to block

    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; Private static Boolean shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) {/ / precursor nodes wait states int ws = Mr Pred. WaitStatus; If (ws == Node.SIGNAL) return true; if (ws == node. SIGNAL) return true; if (ws == node. SIGNAL) return true; If (ws >0) {if (ws >0) {if (ws >0) { Do {node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; // CAS sets the status of the precursor to SINNAL compareAndSetWaitStatus(pred, ws, node.signal); // CAS sets the status of the precursor to SINNAL compareAndSetWaitStatus(pred, ws, node.signal); } return false; }Copy the code

    If shouldParkAfterFailedAcquire returns true, it will call parkAndCheckInterrupt method, the main method of internal call LockSupport tool class park blocking threads () method

    private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // Return thread.interrupted (); }Copy the code

    Acquire () method flow

  • Exclusive release synchronization state
  • Public final Boolean release(int arg) {if (tryRelease(arg)) {Node h = head; if (h ! = null && h.waitStatus ! If (0) // Wake up the unparkprecursor (h); return true; } return false; }
    Private void unparksucceeded (Node Node) {// Get the current wait state int ws = node.waitStatus; If (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; / / if subsequent node is null or its status to CANCELLED (wait for supermarket or interruption) if (s = = null | | s. aitStatus > 0) {s = null; For (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s ! = null) // Wake up the thread associated with the node locksupport.unpark (s.read); }Copy the code

    Copy the code

    From the source, you can see that the awakened node is traversed from the end rather than the beginning, because the successor of the current node may be null, wait timeout, or interrupted, so it is traversed from the end forward

    Shared synchronization state acquisition and release

    The main difference between shared and exclusive access is that multiple threads can obtain a synchronization state at the same time. For example, a read lock in ReentrantReadWriteLock is used

  • Obtaining synchronization status in shared mode
  • Public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0) doAcquireShared(arg); // If (tryAcquireShared(arg) < 0) doAcquireShared(arg); }Copy the code
    Private void doAcquireShared(int arg) {// Add the SHARED Node to the queue final Node Node = addWaiter(node.shared); boolean failed = true; // Interrupt token Boolean interrupted = false; // loop for (;;) Final Node p = node.predecessor(); Int r = tryAcquireShared(arg); // If (p == head) {int r = tryAcquireShared(arg); Propagate(node, r); propagate (node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; }} / / to determine whether a thread to block the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code


  • Release synchronization state in shared mode
  • 
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    Copy the code

    The CAS service is used. If the CAS operation fails, the spin cycle is performed again

    The synchronization status is obtained due to timeout

    Using the built-in lock synchronized may cause deadlocks, but AQS provides timeout synchronization, that is, obtaining synchronization status within a specified period of time

  • Exclusive timeout gets synchronization status
  • public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (! tryAcquire(arg)) doAcquireInterruptibly(arg); }Copy the code
    private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code



    Timeout control

    
        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
        }
    
    private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // Final long deadline = system.nanotime () + nanosTimeout; // Final Node Node = addWaiter(node.exclusive); boolean failed = true; Try {// loop for (;;) Final Node p = node.predecessor(); If (p == head && tryAcquire(arg)) {setHead(node); p.next = null; // help GC failed = false; return true; } // nanosTimeout = deadline-system.nanotime (); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); If (thread.interrupted ()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code

    Copy the code





  • Shared timeout obtains synchronization status
  • Thank you

    The Art of Concurrent Programming in Java