Welcome to the documentation

General documentation: Article directory Github: github.com/black-ant

I. Fundamentals of AQS

AQS in a word:

  • An abstract class called AbstractQueuedSynchronizer
  • It contains two important concepts: CHL queue (CHL queue) + STATE flag
  • Supports two types of locks: exclusive lock and shared lock.

1.1 What is AQS?

Java. Util. Concurrent. The locks. AbstractQueuedSynchronizer abstract classes, referred to as “AQS, queue synchronizer

What it does: A synchronizer for building locks and synchronization containers. It uses a FIFO queue to represent the threads queued for locks

  • The head node of the queue is called a “sentinel node” or a “dummy node” and is not associated with any threads.
  • The other nodes are associated with the wait thread, and each node maintains a wait state, waitStatus

Scenario: AQS addresses a number of details involved in implementing a synchronizer, such as obtaining synchronization status and FIFO synchronization queues


1.2 What does AQS status mean?

AQS uses an int of status to indicate the synchronization state, which is important for tracking whether a thread should block when a node is notified when its predecessor is released. Otherwise, each node in the queue acts as a monitor for a particular notification style that holds a single waiting thread.

  • Status > 0: The lock is obtained
  • Status = 0: The lock is released
  • status < 0 :

1.3 Common methods:

State handling

  • GetState () : Returns the current value of the synchronization status.
  • SetState (int newState) : sets the current synchronization state.
  • CompareAndSetState (int Expect, int Update) : Sets the current state using CAS. This method ensures that the state setting is atomic.

Exclusive lock correlation methods

  • 【 解 override 】#tryAcquire(int arg) : obtain the synchronization status from the other thread. After the synchronization status is successfully obtained, other threads need to wait until this thread releases the synchronization status before obtaining the synchronization status.
  • #tryRelease(int arg) : release the synchronized state exclusively.

Share locking related methods

  • 【 改 编 】#tryAcquireShared(int arg) : obtain the synchronization status by sharing. If the return value is greater than or equal to 0, the process is successful. Otherwise, the capture fails.
  • 【 overwrite 】#tryReleaseShared(int arg)
  • [overridden] #isHeldExclusively() : Indicates whether the current synchronizer is occupied by the thread in exclusive mode.

Obtain the synchronization status exclusively

  • Acquire (int arg) : acquire the synchronization status exclusively.
    • If the current thread succeeded in obtaining the synchronization status, this method returns. Otherwise, the system waits in the synchronization queue. This method will call the overriding #tryAcquire(int arg) method;
  • AcquireInterruptibly (int arg) : Same as #acquire(int arg), but this method responds to an interruption.
    • The current thread enters the synchronization queue in order to obtain the synchronization state
    • Throws InterruptedException() if the current thread is interrupted
    • If there is no interrupt, a tryAcquire call is attempted, and the failing thread is queued, possibly blocking and unblocking repeatedly
  • TryAcquireNanos (int arg, long nanos) : Indicates that the synchronization status is obtained after a timeout.
    • Throws InterruptedException() if the current thread is interrupted
    • Return false if the current thread has not acquired the synchronization status within nanos time, and true if it has.
    • No timeout no access will be a queue, repeatedly blocking

The synchronization status is shared

  • AcquireShared (int ARG) : Obtains the synchronization status in shared mode
    • If the current thread does not obtain the synchronization state, it will enter the synchronization queue waiting. The main difference with exclusive mode is that multiple threads can obtain the synchronization state at the same time.
  • AcquireSharedInterruptibly (int arg) : Shared access to sync, respond to interrupt.
  • TryAcquireSharedNanos (int ARg, Long nanosTimeout) : Obtains the synchronization status in shared mode and adds the timeout limit.

Releasing the Synchronization State

  • Release (int arg) : Releases the synchronization status exclusively
    • This method wakes up the thread contained by the first node in the synchronization queue after the synchronization state is released.
  • ReleaseShared (int arg) : releases the synchronization status in shared mode.
public class SimpleLock extends AbstractQueuedSynchronizer {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    protected boolean tryAcquire(int unused) {
        logger.info("------> try tryAcquire :{} <-------", unused);
        // Use compareAndSetState to control the synchronization variables in AQS
        if (compareAndSetState(0.1)) {
            logger.info("------> cas success ");
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    @Override
    protected boolean tryRelease(int unused) {
        logger.info("------> try tryRelease :{} <-------", unused);
        setExclusiveOwnerThread(null);
        // Use setState to control synchronization variables in AQS
        setState(0);
        return true;
    }

    public void lock(a) {
        acquire(1);
    }

    public boolean tryLock(a) {
        return tryAcquire(1);
    }

    public void unlock(a) {
        release(1); }}Copy the code

Other related knowledge points:

1 All subclasses of AQS that use either its exclusive lock or its shared lock do not use both of its locks.

2. AQS Principle

2.1 Basic Principles

The core idea of AQS is that if the requested shared resource is idle, the current thread requesting the resource is set as a valid worker thread, and the shared resource is set to the locked state. If the requested shared resource is occupied, then you need a mechanism for threads to block, wait, and allocate locks when they wake up. This mechanism AQS is implemented with CLH queue locking, which queues threads that are temporarily unable to acquire locks.

  • AQS uses an int member variable to represent the synchronization state, and queues resource threads through the built-in FIFO queue.
  • AQS uses CAS to perform atomic operations on the synchronization state to modify its value.
  • After the await, a node is inserted into the condition queue (see code below). When the signal is received, the node is moved to the main queue

CLH (Craig, Landin, and Hagersten) queue

The queue is a virtual bidirectional queue (a virtual bidirectional queue does not have any queue instances but only the association between nodes). AQS encapsulates each thread requesting shared resources into a Node (Node) of a CLH lock queue to realize lock allocation.

AQS defines two resource sharing methods

Exclusive: Only one thread can execute, such as ReentrantLock. Can be divided into fair lock and non-fair lock:

  • Fair lock: The first to arrive in the queue is the first to get the lock
  • Unfair lock: When a thread wants to acquire a lock, it grabs the lock regardless of the queue order

Share: Multiple threads can latch simultaneously, such as Semaphore/CountDownLatch. Semaphore, CountDownLatCh, CyclicBarrier, and ReadWriteLock will be covered later.

2.1 AQS uses the template method pattern at the bottom

2.1.1 Template Method Details

The design of the synchronizer is based on the template method pattern. If you need to customize the synchronizer, the common way is as follows (a classic application of the template method pattern) :

  1. Users inherit AbstractQueuedSynchronizer and overwrite the specified method. (These overrides are as simple as obtaining and releasing the shared resource state)
  2. Combine AQS in an implementation of a custom synchronous component and call its template methods, which in turn call methods overridden by the consumer.

This is very different from the way we used to implement interfaces, which is a classic use of the template method pattern.

AQS using the template method pattern, when the custom synchronizer need to rewrite the following several AQS provide template method: the following methods did not rewrite throw UnsupportedOperationException

  • IsHeldExclusively ()// Whether the thread is monopolizing resources. You only need to implement it if you use the condition.
  • TryAcquire (int) tryAcquire(int) Attempts to obtain resources return true on success or false on failure.
  • TryRelease (int) tryRelease(int) Attempts to release resources return true on success or false on failure.
  • TryAcquireShared (int)// Share mode. Try to obtain resources. Negative numbers indicate failure; 0 indicates success but no remaining resources are available. A positive number indicates success and the remaining resources are available.

2.1.2 Common implementation cases

Semaphore

Function: Allow multiple threads to access at the same time

Synchronized and ReentrantLock allow only one thread to access a resource at a time, while Semaphore allows multiple threads to access a resource at the same time.

CountDownLatch

Function: CountDownLatch is a synchronization utility class that coordinates synchronization between multiple threads.

This tool is usually used to control thread waiting. It allows a thread to wait until the countdown is over before executing.

CyclicBarrier

What it does: CyclicBarrier is very similar to CountDownLatch in that it can also implement technical waits between threads, but it is more complex and powerful than CountDownLatch.

Application scenario: Similar to CountDownLatch. CyclicBarrier literally means a CyclicBarrier.

What it does: causes a group of threads to be blocked when they reach a barrier (also known as a synchronization point), until the last thread reaches the barrier, the barrier opens and all threads blocked by the barrier continue to work. The default constructor for CyclicBarrier is CyclicBarrier(int parties), whose argument is the number of threads that the barrier blocks. Each thread calls await to tell CyclicBarrier that I have reached the barrier, and then the current thread is blocked.

3. Obtain and release AQS synchronization status

Acquire and release synchronization state exclusively

  • Only one thread is in the synchronized state at a time
  • Acquire (int arg) : This method is interrupt insensitive, i.e. a thread is not removed from the CLH synchronization queue upon subsequent interrupt operations on the thread
  • TryAcquire (int arg) : Attempts to acquire synchronization status
    • True: The lock status is set and the thread is returned without blocking. Spin until the synchronization status is successful
    • False: failed, #addWaiter(Node mode) adds the current thread to the end of the CLH synchronization queue
  • AcquireQueued: Spin until the synchronization state succeeds

public final void acquire(int arg) {
	if(! tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

Obtain and release synchronization status in a shared manner


// First call tryacquisharered at least once
// 
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}


// The above call fails, and the thread may be queued repeatedly to block and unblock
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return; }}/ / shouldParkAfterFailedAcquire check and update failed to obtain the node state. Returns true if the thread blocks
               // parkAndCheckInterrupt: interrupt the thread
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}Copy the code

The role of shouldParkAfterFailedAcquire

  • SIGNAL: If the wait status is node. SIGNAL, the thread of the next Node of pred needs to block and wait. The node thread is awakened when the PREd thread releases the synchronization state
  • When the state is 0 or PROPAGATE, change the state to node.signal by using the CAS setting
  • CANCELLED indicates that the previous NODE of the thread has timed out or been interrupted. In this case, delete the previous NODE from the CLH queue and trace back until the status of the previous NODE is <= 0

Example Query the status of waiting threads in a synchronization queue

 // Spin processing process:
        for (;;) {
            	// Get the current thread's precursor node
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    // The current thread's precursor node is the head node, and the synchronization state is successful
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
            	// The thread waits
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }   



Copy the code

Block and wake up threads

AQS is in the processing2It's worth going into the process of blocking and waking up// Blocking occurs after obtaining the corresponding synchronization method synchronization failure. The process is as follows:Step Start: acquiring the synchronization state - > get failed - > check the state shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) Step2Returns thetrue-> Current thread should be plunger Step3: parkAndCheckInterrupt() Blocks the thread - Suspend the current thread by calling the LockSupport#park(Object Blocker) method, and the thread is blocked and waiting to be woken up// The next step is to wake up the thread. There are two types of wake upFirst, when the preorder node of the current node (thread) releases the synchronization state, the thread is awakened. Second, when the current thread is interrupted, the thread is awakened. Step Start: Wake up the unparksucceeded node of the thread after it releases its synchronization state. Step2: The successor node exists, wake up the successor node locksupport.unpark (s.tehread) Step3: If the successor node isnull(timeout, interrupt), using tail traceback to find the first available thread/ / added:> park() : block the current thread > Park (Object Blocker) : for thread scheduling, use the current thread before the license is available > unpark: make it available if the license for a given thread is not available > parkNanos(longNanos) : To disable the current Thread for Thread scheduling, wait up to the specified wait time, unless permissions are available for -park and unpark(Thread Thread) methods, both of which come in pairs. Must be executed after the park method executesCopy the code

5. CLH synchronization queue

> introduction: CLH synchronization queue is a two-way FIFO queue, which AQS relies on to complete synchronization state management >2Kinds of state: • when thread if failed to get the synchronization status, AQS will wait for the current thread already wait state information structure into a Node (the Node) and add it to the CLH synchronous queue, and blocks the current thread, when sync release, will wake the first Node (fair), make it try again to access sync. C - Node: internal static class AbstractQueuedSynchronizer SF - Node SHARED =new Node();
	SF- Node EXCLUSIVE = null;
	SF- int CANCELLED =  1;
	SF- int SIGNAL    = -1;
	SF- int CONDITION = -2;
	SF- int PROPAGATE = -3;
	F- volatile intWaitStatus -- CANCELLED SIGNAL CONDITION PROPAGATE INITAL total5In which INITAL is the initial state F-volatileNode prev; -- points to the previous node F-volatileNode next; -- points to the latter node F-volatileThread thread; -- Node Thread of the Node Thread f-node nextWaiter; M- tryAcquire: acquire synchronization status M- tryAcquireShared: Acquire synchronization status share M- addWaiter: join the queue M- isShared: Check whether the synchronization state is shared m-fossil skull: The previous Node Node that obtained the Node Node// Attribute details:Node contains items commonly used in linked lists2Concepts: Prev, next, synchronizer included2M -addwaiter - Prepare a new node -> record the tail node -> put the new node into the tail node -> CAS - set a new tail node - fail and try again until successful > Exit: - When the thread of the first Node releases its synchronization state, it wakes up its next Node (node.next). The successor node will set itself to head when it succeeds in obtaining the synchronization status. - setHead - This operation is a single-threaded operation// Principle brief:
        
	
	
Copy the code

Six. AQS source code

9.6.1 Question 1: CLH form

AQS has two properties

  • private transient volatile Node head;
    • Wait for the head of the queue and load lazily. After initialization, only the sehead method can be used to modify.
    • Note: If the head exists, its waitStatus is guaranteed not to exist
  • private transient volatile Node tail;
    • Tail of the waiting queue, lazy loading

AQS has several important approaches

  • private Node addWaiter(Node mode)
    • Creates and enters a node queue for the current thread and the given pattern
  • private void setHead(Node node)
    • Set the queue header to node to exit the queue.
    • Only through the acquire method.
    • Empty unused fields (GC and efficiency)
  • private Node enq(final Node node)
    • The node is inserted into the queue and initialized if necessary

9.6.2 Problem 2: Node

AQS has an inner class Node, which is a Node object with four properties that represent state

  • SIGNAL: This node’s successor is blocked (or about to be blocked)(via park), so the current node must unblock its successor before releasing or canceling the successor
  • CANCELLED: The node is CANCELLED due to timeout or interruption
  • CONDITION: The node is currently in the CONDITION queue. It will not be used as a synchronous queue node until transmission, when the state will be set to 0
  • PROPAGATE: This releaseShared should be propagated to other nodes

It also has several important properties

  • Volatile Node prev: indicates the previous Node
  • Volatile Node Next: indicates the next Node
  • Volatile Thread Thread: Indicates the current Thread

9.6.3 AQS state

  • private volatile int state;

9.6.5 AQS Flow chart

7. AQS use

@ https://github.com/black-ant/case/tree/master/case%20Module%20Thread/case%20AQS

public class SimpleLock extends AbstractQueuedSynchronizer {

    @Override
    protected boolean tryAcquire(int unused) {
        // Use compareAndSetState to control the synchronization variables in AQS
        if (compareAndSetState(0.1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    @Override
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        // Use setState to control synchronization variables in AQS
        setState(0);
        return true;
    }

    public void lock(a) {
        acquire(1);
    }

    public boolean tryLock(a) {
        return tryAcquire(1);
    }

    public void unlock(a) {
        release(1); }}/ / analysis
1	 [           main] try tryAcquire :1  <-------
2	 [           main] cas success 
3	 [      Thread-52] try tryAcquire :1  <-------
4	 [      Thread-52] try tryAcquire :1  <-------
5	 [      Thread-52] try tryAcquire :1  <-------
6	 [      Thread-53] try tryAcquire :1  <-------

16	 [           main] try tryRelease :1 <-------
17	 [      Thread-52] try tryAcquire :1  <-------
18	 [      Thread-52] cas success 
19	 [      Thread-52] c.g.s.thread.aqs.demo.logic.StartLogic   : ------> acquired the lock! <-------
20	 [      Thread-52] try tryRelease :1 <-------
21	 [      Thread-53] try tryAcquire :1  <-------
22	 [      Thread-53] cas success 
23	 [      Thread-53] c.g.s.thread.aqs.demo.logic.StartLogic   : ------> acquired the lock! <-------
24	 [      Thread-53] try tryRelease :1 <-------
    
    
// Line 2: The main thread acquires an exclusive lock, causing the next three to six threads to fail to acquire the lock, queued
// Line 16: main releases the lock, so from lines 17 to 20, is the flow of thread-52 (see the queue flow of 53 later)



// AQS uses tryAcquire and tryRelease, as shown above
TODO 
Copy the code