preface

AQS (AbstractQueuedSynchronizer) for Java and the core of the contract awarding of tools, we can see its implementation class there are a lot of the synchronizer:

So how does AQS define and manage resources? In what way is the synchronizer extended from AQS? This is what needs to be discussed in this article.

Due to the author’s limited level, there may be flaws and mistakes in the analysis, I hope you can point out, let us learn together, progress together.

AQS Resource Management

In The AQS, there is a state field. The annotation The Synchronization state means The synchronization state. It is used to indicate The current state of resources.

Different synchronizers define different resources. Common synchronizers define different resources as follows:

  • ReentrantLock: Resource indicates an exclusive lock. State 0 indicates that the lock is not held, 1 indicates that the lock is occupied, and N indicates the reentrant times.
  • ReentrantReadWriteLock: Resource indicates a shared read lock and an exclusive write lock. State is logically divided into two 16-bit unsigned shorts that record how many threads the read lock is used and how many times the write lock is reentered.
  • CountDownLatch: Resource counts. A state of 0 means that the counter returns to zero and all threads have access to the resource. A value of N indicates that the counter is not zero and all threads need to block.
  • CyclicBarrier: the same as CountDownLatch, except that the latch is reset to its original value when the counter is zero, and so on.
  • Semaphore: Resources represent semaphores or tokens. A state of 0 means that no token is available and all requesting threads need to block; If the token is greater than 0, the token is available, the requesting thread obtains one, and state decreases by 1. For each token released by the executing thread, state increases by 1.

Now that you know the definition of resource state, it’s time to look at how the synchronizer defines, obtains, and modifies state. AQS by template method defines a series of methods are used to subclass rewritten, these methods in AQS direct call is throw UnsupportedOperationException anomalies, they respectively are:

  • tryAcquire: Exclusively obtains the status
  • tryRelease: Exclusive release state
  • tryAcquireShared: Shared access status
  • tryReleaseShared: Shared release status
  • isHeldExclusively: Indicates whether the status is exclusive

Ok, now that we know how AQS manages resources, our next concern is whether the processing of waiting threads should be handled by AQS or its subclass synchronizer. How does it work? This leads to an important concept in concurrent operations: wait queues.

By looking at the code for AQS, we can see that it has two inner classes:

Node is the Node that waits for the queue, and ConditionObject is the inner class that implements Condition.

For the object.await family of methods, if the wait() method of an Object is called, the current thread must own the lock of the Object, so the wait() method must be called in a synchronized block or method. It belongs to the Object class because every Object can have a lock, so it is handled through an Object, not a thread.

ConditionObject, implemented by AQS, works on threads and schedules them.

So AQS provides two functions for developing synchronizers:

  1. A mechanism for managing resources
  2. The use of the Condition

The synchronizer only needs to focus on solving:

  1. What exactly is a resource?
  2. Use of blocking queues

Waiting queue

Earlier we saw that there is a Node class in AQS, which is the Node waiting for the queue. While it looks like it has a long comment, it actually doesn’t have a lot of code, more than 60 lines after the comment is removed.

Since this article is primarily a sharing of design patterns (well, mostly because I’m not good enough), I’ll skip the code and focus on how it works.

Comments at the beginning of class said this is a variation of CLH node, then the content of the general CLH is used to spin lock, its node saves the current blocking thread information, if released its precursor node, through modification of the current node waitStatus field to by the current node, then the team precursor nodes, let the current node attempts to acquire lock. If new waiting threads are queued, they are added to the end of the queue in a first-in, first-out order.

There are five states for the waitStatus field:

Node status waitStatus describe
SIGNAL – 1 Indicates that the current node is blocked. (A precursor node should change its node type to SIGNAL after joining the queue, but before blocking, so that it can wake up when the node is cancelled or released.)
CANCELLED 1 Indicates that the precursor times out or is interrupted and needs to be removed from the queue
CONDITION 2 – Indicates that the precursor is in the Condition queue, blocking and waiting for a Condition
PROPAGATE – 3 It is suitable for shared mode (for example, continuous read operation nodes can enter the critical region in turn, PROPAGATE is helpful for realizing such iterative operation.)
OTHER 0 None of the above states. The node is in this state when it is first created

In Node, there are prev and Next fields, which form the bidirectional linked list structure of CLH. This structure is used because when the current Node is released, the precursor Node of the current Node can be pointed to the successor Node.

If you are interested in how to implement a wait queue, you can read this article: CLH Lock Queue and Java implementation

ConditionObject

ConditionObject internally uses Node as the structure of the list. We will focus on its await method and signal method.

await()

public final void await(a) throws InterruptedException {
    if (Thread.interrupted()) // Thread interrupt processing
        throw new InterruptedException();
    Node node = addConditionWaiter(); // Insert conditional wait node
    // Release the lock held by the current thread
    // Get the lock state and subtract it
    int savedState = fullyRelease(node); 
    int interruptMode = 0;
    while(! isOnSyncQueue(node)) {// Determine whether the current node exists in the wait queue
        LockSupport.park(this);
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
    }
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
        unlinkCancelledWaiters();
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
}
Copy the code

signal()

public final void signal(a) {
    // Determine whether the current thread holds the lock
    // This method requires a synchronizer implementation
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    // Release condition queue queue first node
    if(first ! =null)
        doSignal(first);
}
Copy the code

The doSingal(Node First) method is then called:

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // Disconnect this node from the queue
        first.nextWaiter = null;
    } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
Copy the code

The transferForSignal method is then called to turn the Condition node into the initial node and insert it into the wait queue.

final boolean transferForSignal(Node node) {
    // Change the node type from Condition:2 --> INTIAL:0
    if(! node.compareAndSetWaitStatus(Node.CONDITION,0))
        return false;

    // Insert the wait queue
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0| |! p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
Copy the code

For the singalAll() method, internally the doSignalAll() method is called to dequeue all nodes in the conditional wait queue.

Conditional queues are mainly used in producer/consumer mode. The following section on blocking queues describes a concurrent blocking queue that uses AQS ConditionObject.

An exclusive lock

This section explains the exclusive lock mechanism of AQS by analyzing ReentrantLock locking and unlocking.

Before we start, let’s take a look at the implementation of fair and unfair locks. We know that a fair lock is when multiple threads acquire the lock in the order in which they apply for it. Let’s look at ReentrantLockFairSyncandNonfairSyncNow, you can see that the code difference is that there’s one in the fair lockhasQueuedPredecessorsMethod is used to determine whether the thread has a precursor node. If it does not, it is at the head of the queue and can then try to acquire the lock. In an unfair lock, the current thread attempts to acquire the lock regardless of whether it is the head of the queue.

Then let’s look at the acquisition and release of an exclusive lock under a fair lock:

Acquiring a lock

The reentrantLock. lock() method actually calls AQS acquire method:

// First try to acquire the lock. If that fails, encapsulate it to wait for the node to be queued and suspend the current thread
public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

TryAcquire is the template method, let’s take a look at its fair lock implementation in ReentrantLock:

protected final boolean tryAcquire(int acquires) { // acquires=1
    final Thread current = Thread.currentThread(); // The current thread
    int c = getState(); // Get the synchronization status
    if (c == 0) { // Indicates that the lock is not occupied
        if(! hasQueuedPredecessors() &&// If the queue is the first node
            compareAndSetState(0, acquires)) { // Try to get the lock
            // The lock is successfully locked, and the exclusive thread of the lock is set to the current thread
            setExclusiveOwnerThread(current); 
            return true; }}else if (current == getExclusiveOwnerThread()) { // The exclusive thread is not the current thread
        int nextc = c + acquires; // Reentrant times
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc); // CAS sets the lock status
        return true;
    }
    return false;
}
Copy the code

Release the lock

The reentrantLock.unlock () method actually calls AQS ‘release method:

public final boolean release(int arg) {
    if (tryRelease(arg)) { // Try to release the lock: state-1
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h); // Unlock successfully, wake up the thread of the first node
        return true;
    }
    return false;
}
Copy the code

The code to release the lock is also simple:

protected final boolean tryRelease(int releases) { // releases=1
    int c = getState() - releases; // Lock state minus 1
    // Throw an exception if the current thread is not an exclusive thread
    if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
    boolean free = false;
    // Lock status 0 indicates that the lock is not occupied
    if (c == 0) {
        free = true;
        // Set the exclusive thread to NULL
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
Copy the code

The first forerunner is called after the unparkprecursor is unlocked successfully

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus; / / ws = 1 at this time
    if (ws < 0)
        // Set the current state to 0 to indicate that the successor node is about to be woken up
        node.compareAndSetWaitStatus(ws, 0);

    Node s = node.next;
    // If the subsequent node is CANCELLED (ws=1), the first releasable node will be found from the end of the queue
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node p = tail; p ! = node && p ! =null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    // Wake up the thread
    if(s ! =null)
        LockSupport.unpark(s.thread);
}
Copy the code

A Shared lock

This section analyzes the locking and unlocking of CountDownLatch to understand AQS shared locking mechanism.

Both await() and countDown() of CountDownLatch are methods that call the inner class Sync, so we’ll focus on the construction of this class and its related methods.

Acquiring a lock

CountDownLatch await is actually calling AQS acquireSharedInterruptibly method:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    // If the thread is waiting to interrupt, an exception is thrown
        if (Thread.interrupted())
            throw new InterruptedException();
    // Obtain the shared lock. If the value is smaller than 0, the lock fails to be obtained
        if (tryAcquireShared(arg) < 0)
            // If the shared lock fails to be acquired, join the queue
            doAcquireSharedInterruptibly(arg);
    }
Copy the code

The tryAcquireShared method in CountDownLatch is very simple. The state of the lock is 0, which means that the condition is complete and all waiting threads are ready to execute:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0)?1 : -1;
}
Copy the code

Release the lock

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // Try to release the lock
        doReleaseShared(); // Perform release operation
        return true;
    }
    return false;
}
Copy the code
protected boolean tryReleaseShared(int releases) { // releases = 1
    // Try to release the lock once to determine whether the lock can be released
    // return nextc == 0
    for (;;) { 
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
            return nextc == 0; }}Copy the code

If the lock can be released successfully, the doReleaseShared method is entered:

private void doReleaseShared(a) {
    for (;;) {
        Node h = head;
        if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // Mark the state of the head node as 0 to wake up the successor nodes
                if(! h.compareAndSetWaitStatus(Node.SIGNAL,0))
                    continue;
                unparkSuccessor(h); // Wake up the successor node
            }
            else if (ws == 0 &&
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;  
        }
        if (h == head) 
            break; }}Copy the code

The unparksucceeded has been analysed before and will not be repeated here.

Blocking queue

Blocking queues are divided into bounded blocking queues and unbounded blocking queues.

  • Bounded blocking queue: Production is finite and blocks when the number of elements stored in the queue is equal to the maximum, while consumption blocks when the number of elements stored in the queue is equal to zero.
  • Unbounded blocking queue: Production is infinite.

Bounded blocking queues

Common bounded blocking queues are:

  1. ArrayBlockingQueue: The capacity is specified at creation time and cannot be changed dynamically.
  2. LinkedBlockingQueue: This can either specify the size of the queue at the time of initial construction or not, default isInteger.MAX_VALUE. LinkedBlockingQueue maintains two lockstakeLockandputLockAt the same time, one thread can join the queue and another thread can leave the queue.
  3. LinkedBlockingDeque: The capacity defaults toInteger.MAX_VALUE, the underlying double – ended queue structure based on double – linked list implementation.

An example of how a bounded blocking queue works is ArrayBlockingQueue.

For its put(E E) method:

public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            If the queue is full, wait on the notFull queue
            while (count == items.length)
                notFull.await();
            If the queue is not full, join the queue directly
            enqueue(e);
        } finally{ lock.unlock(); }}private void enqueue(E e) {
        final Object[] items = this.items;
        items[putIndex] = e;
        // If the queue is full, reset the index to 0
        if (++putIndex == items.length) putIndex = 0;
        count++;
        // Wake up a consumption wait thread
        notEmpty.signal();
    }

Copy the code

Why use a while loop to determine if the queue is full?

This is the system of multi-threaded design mode “Guarded Suspension”. For details, see this blog “Multi-threaded design Mode” – “Guarded Suspension”.

For queueing methods, such as take() :

public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // If the queue is empty, the thread waits in the notEmpty conditional queue
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally{ lock.unlock(); }}private E dequeue(a) {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if(itrs ! =null)
        itrs.elementDequeued();
    // Wake up a production wait thread
    notFull.signal();
    return e;
}
Copy the code

Unbounded blocking queue

Common unbounded blocking queues are:

  1. PriorityBlockingQueue: A priority queue, enqueued by weight, with a data-based heap structure at the bottom.
  2. SynchronousQueue: Its underlying implementationThe stackandThe queue, the capacity is 0, and the data is passed directly between the consumer and producer. Support fair/unfair strategies.
  3. DelayQueue: The underlying implementation is based on PriorityBlockingQueue, which is a timed unbounded blocking queue.
  4. LinkedTransferQueue: when the producer calls ittransferMethod, if a consumer is blocking waiting for data, it will be transmitted directly to the consumer, then the data will be queued, and then blocked until the consumer thread can consume.

In the previous section, we learned that for bounded blocking queues, threads are scheduled using two conditions, one scheduling producer and one scheduling consumer. This is easy to understand for unbounded blocking queues, which use only one Condition to schedule consumers.

The implementation process is similar to a bounded blocking queue, so I won’t write it here…