Source: Distributed system architecture

Java.util.concurrent the JDK’s JUC package (java.util.concurrent) provides a wide range of Java concurrency tools to use

This paper starts with the object lock commonly used in JUC package, the use of concurrent tools and functions, with problems, from shallow to deep, step by step to analyze the implementation of the underlying AQS abstract class

Noun explanation

1 AQS

AQS is an abstract class, the class path all Java. Util. Concurrent. The locks. AbstractQueuedSynchronizer, abstract queue synchronizer, is based on the concurrent tool abstract class template pattern development, has the following concurrent classes based on AQS implementation:


2 CAS

CAS, short for Conmpare And Swap, is an atomic manipulation instruction

The CAS mechanism uses three basic operands: memory address addr, oldVal, oldVal, and newVal. When a variable is updated, the value of memory address addr is changed to newVal only if the expected value of the variable oldVal is the same as the actual value of memory address addr

Based on the idea of optimistic locking, the variable value thread can be updated safely through constant trial and comparison of CAS

3 Thread Interrupt

Thread interrupt is a thread cooperation mechanism used to cooperate with other threads to interrupt the execution of tasks

When a thread is in a blocking wait state, such as after calling wait(), join(), or sleep() and interrupt(), the thread immediately exits the block and receives InterruptedException.

Interrupt () isInterrupted by calling isInterrupted() in the thread’s task execution logic and then responds to interrupt. InterruptedException is usually thrown

Object locking features

What are the basic features of object locking and concurrency tools, and how are they implemented

1 Obtain the value explicitly

Take ReentrantLock as an example. You can obtain the lock explicitly in the following four ways

  • (1) Block waiting for acquisition

ReentrantLock lock = new ReentrantLock(); // block and wait until successful lock.lock();Copy the code
  • (2) No blocking attempt to obtain

ReentrantLock lock = new ReentrantLock(); If the lock is already occupied by another thread, do not block and wait. Return false. // Return false - failed to obtain the lock Boolean isGetLock = lock.tryLock();Copy the code
  • (3) Block waiting for acquisition within a specified time

ReentrantLock lock = new ReentrantLock(); Try {// Attempts to acquire the lock within the specified time // Returns true - the lock is idle and acquired by the thread, or is already held by the thread // Returns false - The lock has not been acquired within the specified time lock.tryLock(10, timeunit.seconds); } catch (InterruptedException e) {if (InterruptedException e) {if (InterruptedException e) {if (InterruptedException e) { }Copy the code
  • (4) Response to interrupt acquisition

ReentrantLock lock = new ReentrantLock(); If thread.interrupt() is set to interrupt, the thread exits the block and waits and throws the interrupt exception lock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); }Copy the code

2 Explicit release

ReentrantLock lock = new ReentrantLock(); lock.lock(); / /... Unlock (); unlock(); unlock();Copy the code

3 reentrant

A thread that has acquired a lock can acquire the lock if it requests it again

4 can be Shared

The same resource can be shared by multiple threads. For example, the read lock of a read/write lock can be shared by multiple threads. The shared lock enables multiple threads to access data concurrently and securely, improving program execution efficiency

5 fair and unfair

Fair lock: Multiple threads compete for locks on a fair, first-come, first-served basis. Before each lock is added, it will check whether there are threads in the wait queue. If there are no threads in the wait queue, it will try to obtain the lock. Unfair lock: When a thread obtains a lock unfairly, the thread tries to acquire the lock first instead of waiting. If no success is obtained, the waiting queue is entered

Because the non-fair lock method can make the subsequent threads have a certain probability to directly acquire the lock, reducing the probability of thread suspension and waiting, the performance is better than the fair lock

AQS implementation principle

1 Basic Concepts

(1) Condition interfaces

Methods like Object wait(), wait(long timeout), notify(), and notifyAll() combined with synchronized built-in locks can implement the wait/notification mode. Object locks such as ReentrantLock and ReentrantReadWriteLock that implement the Lock interface have similar capabilities:

Condition interface defines await(), awaitNanos(long), signal(), signalAll() and other methods, and implements wait/notification function with object lock instances. Condition interface is implemented based on AQS internal class ConditionObject. The thread blocks and enters the CLH queue (mentioned below), waiting for another thread to wake up after calling the signal method

(2) the CLH queue

CLH queue, CLH is the short name of Craig, Landin, Hagersten who came up with the algorithm

AQS maintains a bidirectional FIFO CLH queue internally, and AQS relies on it to manage the waiting threads. If the thread fails to obtain the synchronized competing resources, it will block the thread and join the CLH synchronization queue. When competing resources are idle, threads are blocked based on the CLH queue and resources are allocated

The HEAD node of the CLH stores information about threads currently occupying resources, or no threads, while the other nodes store information about queued threads

CLH

The status (waitStatus) of each node in the CLH is as follows:

  • CANCELLED(1) : the current node has been CANCELLED. When timeout or interrupted (in the case of a response to an interrupt), a change is triggered to this state, after which the node will not change again

  • SIGNAL(-1) : indicates that the successor node is waiting for the current node to wake up. The status of the successor node is updated to SIGNAL before it enters the sleep state after joining the queue

  • CONDITION(-2) : indicates that the node is waiting on CONDITION. When another thread calls CONDITION signal(), the node in CONDITION will be transferred from the wait queue to the synchronization queue, waiting for the synchronization lock

  • PROPAGATE(-3) : In the shared mode, the precursor node not only wakes up the subsequent node, but also may wake up the subsequent node

  • 0: indicates the default status when a new node is added to the queue

(3) Resource sharing

AQS defines two types of resource sharing: Exclusive, which can be performed by only one thread, such as ReentrantLock Share, and multiple threads, such as Semaphore/CountDownLatch

(4) how to block/wake up the thread

The unpark method, provided by the sun.misc.Unsafe class, blocks the thread, and unpark wakes it up. A thread blocked by the park method can exit the block in response to interrupt()

2 Basic Design

Core design idea: AQS provides a framework for implementing CLH queue dependent blocking locks and associated concurrent synchronizers. Subclasses implement protect methods to determine whether resources can be acquired/released. AQS implements thread scheduling strategies based on these protect methods

AQS also provides a variable of int type that supports thread-safe atomic update as a synchronous state value. Subclasses can flexibly define the meaning of the variable to update according to actual needs

The family of Protect methods redefined by subclasses is as follows:

  • Boolean tryAcquire(int) Exclusive attempt to acquire the resource, return true on success, false on failure

  • Boolean tryRelease(int) Exclusive attempts to free the resource, returns true on success, false on failure

  • Int tryAcquireShared(int) Share mode Trying to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources

  • Boolean tryReleaseShared(int) Share mode attempts to free the resource, returns true if allowed to wake up subsequent waiting nodes, false otherwise

These methods are always called by threads that need scheduling cooperation, and subclasses must redefine these methods in a non-blocking manner

AQS provides the following methods to obtain/release resources based on the tryXXX method described above:

  • Void acquire(int) acquires resources exclusively, and the thread returns directly. Otherwise, the thread is queued until it acquires resources, ignoring the impact of interrupt

  • Boolean release(int) an exclusive thread that releases a specified amount of the resource. If it does, it wakes up other threads in the waiting queue to retrieve the resource

  • Void acquireShared(int) Obtains resources exclusively

  • Boolean releaseShared(int) Release resources in shared mode

Take exclusive mode as an example: The core implementation of obtaining/releasing resources is as follows:

Acquire: while (! TryAcquire (ARG)) {queue a thread if it is not already queued; } Release: if (tryRelease(arg)) wakes up the first queued thread in CLHCopy the code

Here, a little round, the following picture to introduce the above design ideas to redefine:

AQS basic design

Feature implementation

The following describes the implementation principles of a series of functions and features of aQs-based object locking and concurrency tools

1 Obtain the value explicitly

This feature uses ReentrantLock as an example. A ReentrantLock is a reentrantobject lock. Each time a thread successfully obtains the lock, the synchronization state value is increased by 1, and the release lock state value is decreased by 1

ReentrantLock locks support fair/unfair features. The following explicit acquisition features use fair locks as an example

(1) Block waiting for acquisition

The basic implementation is as follows:

  • TryAcquire (int) ReentrantLock AQS tryAcquire(int) ReentrantLock AQS tryAcquire(int

  • 2. Acquire(int) method of AQS determines whether the current node is head and whether the resource can be obtained based on tryAcquire(int). If not, it will join CLH queue and wait for blocking

  • AQS acquire(int) method blocks and waits for the lock to be acquired

TryAcquire (int) in ReentrantLock

protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); If (c == 0) {// No thread holds the lock if (c == 0) {// The head of the CLH queue determines that no other thread acquires earlier than the current one and sets state successfully based on CAS. Hasqueuedtoraise () && compareAndSetState(0, acquires)) {// Set the thread holding the lock to the current thread setExclusiveOwnerThread(current); return true; Else if (current == getExclusiveOwnerThread()) {int nexTC = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // another thread already holds the lock return false; }Copy the code

AQS acquire(int) method implementation

Public final void acquire(int arg) {public final void acquire(int arg) {public final void acquire(int arg) {public final void acquire(int arg) { TryAcquire (arg) && acquireQueued(addWaiter(Node.exclusive), arg)) // acquireQueued returns true, This method is called to set the thread interrupt flag bit to true selfInterrupt(); }final Boolean acquireQueued(final Node Node, int arg) {Boolean failed = true; Boolean interrupted = false; // loop until the resource is released for (;;) {// Final Node p = node.predecessor(); // If the head is the first node, that is, the node is the second node, then it is qualified to try to obtain resources. Interrupt () if (p == head && tryAcquire(arg)) {// setHead(node); // help GC p.next = null; failed = false; return interrupted; } // if the thread is interrupted during the process and does not respond to the interrupt // and continues to queue for resources, Interrupted variables set to true if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

(2) No blocking attempt to obtain

The implementation of tryLock() in ReentrantLock is merely a non-fair lock implementation. The implementation logic is basically the same as that of tryAcquire, except that it does not check the HEAD of the CLH queue by hasqueued24 () to see if another thread is waiting on it, so that when the resource is released, If there are threads requesting resources, they can jump the queue first

TryLock () in ReentrantLock

public boolean tryLock() { return sync.nonfairTryAcquire(1); }final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); If (c == 0) {// Set state successfully based on CAS (expected state old value was 0) // Did not check whether there are threads in CLH queue waiting for if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; Else if (current == getExclusiveOwnerThread()) {int nexTC = c + acquires; If (nextc < 0) // overflow, throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // another thread already holds the lock return false; }Copy the code

(3) Block waiting for acquisition within a specified time

The basic implementation is as follows:

  • ReentrantLock’s tryLock(long, TimeUnit) calls AQS ‘tryAcquireNanos(int, long)

  • TryAcquire (int); doAcquireNanos(int, long)

  • AQS doAcquireNanos determines whether the current node is head and can obtain resources based on tryAcquire(int). If it cannot obtain resources and the timeout is greater than 1 microsecond, the node will sleep for a while and then try to obtain resources

The implementation in ReentrantLock is as follows:

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }public final boolean tryAcquireNanos(int arg, Long nanosTimeout) throws InterruptedException {// If (thread.interrupted ()) throws new if the Thread has been interrupted by the interrupt() method InterruptedException(); / / tryAcquire first attempts to acquire lock return tryAcquire (arg) | | doAcquireNanos (arg, nanosTimeout); }Copy the code

The implementation in AQS is as follows:

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; Final long deadline = system.nanotime () + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); Boolean failed = true; try { for (;;) {// Final Node p = node.predecessor(); // If the head is the first node, that is, the node is the second node, then it is qualified to try to obtain resources. Interrupt () if (p == head && tryAcquire(arg)) {// setHead(node); // help GC p.next = null; failed = false; return true; } // update timeout timeout nanosTimeout = deadline-system.nanotime (); if (nanosTimeout <= 0L) return false; // If the timeout is greater than 1 microsecond, Thread dormancy to timeout to try again to obtain the if (shouldParkAfterFailedAcquire (p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); If (thread.interrupted ()) throws new InterruptedException(); if (thread.interrupted ()) throws new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code

(4) Response to interrupt acquisition

ReentrantLock acquires the lock in response to an interrupt by: When a thread responds to thead.interrupt() while sleeping in the Park method, it checks that the thread interrupt flag is true and actively throws an exception. The core implementation is in the doAcquireInterruptibly(int) method of AQS

The basic implementation is similar to blocking wait for fetch, except that the AQS doAcquireInterruptibly(int) method is called instead from acquire(int)

private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); Boolean failed = true; try { for (;;) {// Final Node p = node.predecessor(); // If the head is the first node, that is, the node is the second node, then it is qualified to try to obtain resources. Interrupt () if (p == head && tryAcquire(arg)) {// setHead(node); p.next = null; // help GC failed = false; return; } / / need to queue blocked waiting for the if (shouldParkAfterFailedAcquire (p, node) && aroused from / / line blocking, If the interrupt flag bit is true parkAndCheckInterrupt()) // Throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code

2 Explicit release

AQS resource sharing can be divided into exclusive and shared modes. This section uses ReentrantLock as an example to describe the explicit release of exclusive resources and the shared mode will be described later

Explicit ReentrantLock explicit ReentrantLock explicit ReentrantLock explicit ReentrantLock

  • ReentrantLock implements AQS tryRelease(int), which reduces the state variable by 1. If state becomes 0, no thread is holding the lock, return true, otherwise return false

  • The release(int) method of AQS is based on the tryRelease(int) queue to see if there are any threads holding resources, and if not, to wake up the thread of the head node in the CLH queue

  • The thread continues to execute the acquireQueued(Node,int) or doAcquireNanos(int, long) or doAcquireInterruptibly(int) for(;;). To continue trying to get the resource

TryRelease (int) in ReentrantLock

protected final boolean tryRelease(int releases) { int c = getState() - releases; If (thread.currentThread ()! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); Boolean free = false; Unlock if (c == 0) {free = true; setExclusiveOwnerThread(null); } setState(c); return free; }Copy the code

The release(int) method in AQS is implemented as follows:

Public final Boolean release(int arg) {if (tryRelease(arg)) {Node h = head; If (h! = null && h.waitStatus ! // The head node is the thread that hogs resources, the second node is the first unparksucceeded (h); return true; } return false; }Copy the code

3 reentrant

The implementation of ReentrantLock is relatively simple. For example, ReentrantLock is mainly implemented in tryAcquire(int). If the thread holding the lock is the current thread, if so, update the synchronization status value state and return true, indicating that the lock can be acquired

4 can be Shared

Shareable resource The following uses ReentrantReadWriteLock as an example. The difference between ReentrantLock and ReentrantReadWriteLock is that multiple threads can share the read lock. When the write lock is released, multiple threads blocking and waiting for the read lock can obtain the read lock at the same time

The ReentrantReadWriteLock class defines the state synchronization status of AQS as follows: the high 16 bits are the number of read locks held, and the low 16 bits are the number of write locks held

TryAcquireShared (int) and tryReleaseShared(int) in ReentrantReadWriteLock have long logic, mainly involving mutual exclusion of read and write, reentry judgment, and concession of read lock to write lock

Get ReadLock (readlock. lock()).

  • ReentrantReadWriteLock Implements the tryAcquireShared(int) method of AQS to determine whether the current thread can obtain a read lock

  • If AQS acquiacquireshared (int) fails, join CLH queue and wait for blocking

  • ReentrantReadWriteLock readLock. lock() blocks the AQS acquireShared(int) method to wait for the lock to be acquired

The specific realization of obtaining resources in shared mode in AQS is as follows:

Public final void acquireShared(int arg) {tryAcquireShared (int arg) {tryAcquireShared (int arg) {tryAcquireShared If (tryAcquireShared(arg) < 0) doAcquireShared(arg); }// If the resource is successfully acquired, it will be propagated to the node waiting for the resource on the CLH queue doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); Boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); // Propagate the resource successfully if (r >= 0) {setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; }} // if the thread is interrupted during the process, it does not respond to the interrupt and continues to queue for resources, Interrupted variables set to true if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); Private void setHeadAndPropagate(Node Node, int propagate) {Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; If (s = = null | | s.i sShared ()) / / release Shared resources doReleaseShared (); }}Copy the code

ReadLock release (readlock. unlock()) is implemented as follows: ReentrantReadWriteLock releases shared resources as follows:

  • 1. ReentrantReadWriteLock Implements the tryReleaseShared(int) method of AQS to determine whether any thread holds a read lock after the read lock is released

  • DoReleaseShared (); / / doReleaseShared()

  • ReentrantReadWriteLock readLock. unlock() releases a lock based on AQS releaseShared(int)

The specific implementation of sharing mode to release resources in AQS is as follows:

Public Final Boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {// Perform resource release doReleaseShared();  return true; } return false; } private void doReleaseShared() { for (;;) { Node h = head; if (h ! = null && h ! = tail) { int ws = h.waitStatus; If (ws == node.signal) {// The current Node is woken up by another thread. compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } // Enter else if the current node has just become the head node // the last node has just joined the CLH queue, // CAS failed to change the state of the precursor node to SIGNAL before sleep else if (ws == 0 &&! compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; If (h == head) break; if (h == head) break; if (h == head) break; }}Copy the code

5 fair and unfair

This feature is relatively simple to implement, take ReentrantLock as an example, fair lock is directly based on AQS acquire(int) resources, rather than fair lock first attempt queue jumping: based on CAS, expect state synchronization variable value is 0(no thread holds the lock), update is 1, if CAS update fails in the queue

Final void lock() {acquire(1); }// Final void lock() {// state value 0 means that no thread holds the lock, If (compareAndSetState(0, 1)) setExclusiveOwnerThread(thread.currentThread ()); else acquire(1); }Copy the code

conclusion

The meaning of the state variable value of AQS does not necessarily represent resources. Different AQS inherited classes can have different definitions of the state variable value

For example, in the countDownLatch class, the value of the state variable represents the latch count that still needs to be released (which can be understood as the number of latches that need to be opened). Each latch needs to be opened for the door to open, and all waiting threads will start executing. Each countDown() will decrement the state variable by one. If the state variable is reduced to 0, the dormant threads in the CLH queue are awakened

Learning similar to the underlying source code recommended first set a few problems, with the problem to learn; Before popular learning, it is recommended to understand the overall design thoroughly, the overall principle (you can read the relevant documents first), and then study and source details, to avoid the beginning of the source code, easy to fail

Recommended reading (Click to skip to reading)

1. SpringBoot content aggregation

2. Assemble interview questions

3. Design pattern content aggregation

4. Mybatis content aggregation

5. Multi-threaded content aggregation