1. The Lock interface

A lock is used to control how multiple threads can access a shared resource. Generally, a lock can prevent multiple threads from accessing a shared resource at the same time (however, some locks can allow multiple threads to access a shared resource concurrently, such as read/write locks). Before the emergence of Lock interface, Java programs rely on the synchronized keyword to achieve the Lock function, but after Java SE 5, and the package is added to the Lock interface (and related implementation classes) to achieve the Lock function, it provides the synchronization function similar to the synchronized keyword. You just need to explicitly acquire and release locks when you use them. Although it lacks the convenience of acquiring and releasing locks implicitly (provided by synchronized blocks or methods), it has the operability of acquiring and releasing locks, the ability to interrupt acquiring locks and timeout acquiring locks, and other synchronization features that synchronized keywords do not have.

Releasing a lock ina finally block ensures that the lock will eventually be released after it is acquired.

Do not write the lock acquisition process in a try block, because if an exception occurs while acquiring the lock (the implementation of a custom lock), the exception is thrown and the lock is released without reason.

2. Queue synchronizer AQS

2.1 Internal Class Node

Synchronizer rely on internal synchronous queue (two-way a FIFO queue) to complete synchronization state management, the current thread for synchronous state failure, synchronizer will the current thread and wait state information such as the structure become a Node (the Node) and add it to the synchronous queue, blocks the current thread at the same time, when the sync release, Wakes up the thread in the first node to try again to get the synchronization state.

The synchronizer contains references to two node types, one to the head node and the other to the tail node. Imagine that when one thread succeeds in acquiring the synchronization state (or lock), other threads cannot acquire the synchronization state and are instead constructed as nodes and added to the synchronization queue, which must be thread safe

The synchronizer provides a CAS-based method of setting tail nodes :compareAndSetTail(Node expect,Node Update), which requires passing the tail Node and the current Node that the current thread “thinks” of. The current Node is not formally associated with the previous tail Node until the setting is successful.

static final class Node {
        
        // Initialize two node references
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;

       
        static final int CANCELLED =  1;  / / cancelled
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;

      
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;

        Node nextWaiter;

        
        final boolean isShared(a) {
            return nextWaiter == SHARED;
        }

        
        final Node predecessor(a) throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread; }}Copy the code

2.2 Some references:

The basic framework for building locks and other synchronization components, which uses an int member variable to represent the synchronization state, and uses a built-in FIFO queue to queue resource fetching threads


    private transient volatile Node head; // Maintain a reference to the head and tail nodes
    private transient volatile Node tail;

    
    private volatile int state; // The synchronization status is volatile

    // Get the current synchronization status
    protected final intgetState(a) {  return state; }

   // Set the new synchronization state
    protected final void setState(int newState) { state = newState; }// Change the synchronization state through CAS of the unsafe class
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
Copy the code

2.3 Template method

// Provide the subclass to override, exclusive access to the synchronization status, the implementation of this method needs to query the current status and determine whether the expected, and then use CAS to set the synchronization status
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

	 // Provide to subclass override, exclusive release synchronization state, waiting thread will have the opportunity to obtain synchronization state
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    // Get synchronization status in shared mode. If the return value is greater than 0, success
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

   // Shared release synchronization state
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

   // Whether the current synchronizer is occupied by threads in exclusive mode
    protected boolean isHeldExclusively(a) {
        throw new UnsupportedOperationException();
    }

    
  
   // Obtain the synchronization status exclusively. If the synchronization status is obtained successfully, the system returns the synchronization status. Otherwise, the system enters the synchronization queue
    public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }// The same method as above, but the response is interrupted
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if(! tryAcquire(arg)) doAcquireInterruptibly(arg); }// Add a time limit to the above method
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

    // Exclusively releases the synchronization state and wakes up the first node in the synchronization queue
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    The main difference is that multiple threads can obtain the synchronization state at the same time
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

   // As above, the response is interrupted
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

    // Shared release synchronization state
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }


Copy the code

2.4 Joining a Queue and Setting a Head Node

The process by which a synchronizer adds a node to a synchronization queue: The process of enqueuing must be thread-safe, so the synchronizer provides a cas-based method of setting tail nodes :compareAndSetTail(Node expect,Node Update), which passes the tail and current nodes that the current thread “thinks” of. The current node is formally associated with the previous tail node.

    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
Copy the code

The process of setting the first node: the synchronization queue follows FIFO. The first node is the node that successfully obtains the synchronization state. When the thread of the first node releases the synchronization state, it will wake up the successor node, and the successor node will set itself as the first node when successfully obtains the synchronization state.

The first node is set by the thread that has successfully obtained the synchronization state. Since only one thread can successfully obtain the synchronization state, CAS is not required to set the head node. It only needs to set the first node as the successor of the original first node and disconnect the next reference of the original first node.

2.5 Obtaining and Releasing the Exclusive Synchronization Status

The synchronization state can be obtained by calling the acquire(int arg) method of the synchronizer. This method is interrupt-insensitive, that is, since the thread fails to acquire the synchronization state and enters the synchronization queue, the thread will not be removed from the synchronization queue during the subsequent interrupt operation

public final void acquire(int arg) {
        if(! tryAcquire(arg) &&//tryAcquire ensures thread-safe acquisition of synchronization state
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
Copy the code

The tryAcquire(int arg) method of the custom synchronizer is called first. This method ensures thread-safe access to the synchronization state. If the synchronization state fails, a Node (EXCLUSIVE, AcquireQueued (Node Node,int arg) only one thread at a time can successfully obtain the synchronization state) and add the Node to the end of the synchronization queue using the addWaiter(Node Node) method. Causes the node to obtain synchronization status in an “infinite loop”. If not, the thread in the node is blocked, and the blocked thread is woken up mainly by the disqueuing of the precursor node or the blocking thread is interrupted

 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if(pred ! =null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) { // Use compareAndSetTail(pred, node) to ensure that the tail node is added correctly
                pred.next = node;
                returnnode; }}// Serialize requests to add nodes concurrently through CAS
        enq(node); // If multiple threads fail to obtain the synchronization state, add to the list concurrently, perhaps out of order
        return node;
    }

private Node enq(final Node node) {
        for (;;) { // Use an "infinite loop" to ensure that nodes are added correctly
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) { // The current thread can return from this method only after the node is set to tail via CAS, otherwise the current thread keeps trying to set
                    t.next = node;
                    returnt; }}}}Copy the code

After a node enters the synchronization queue, it enters a spin process. Each node (or thread) is introspectively observing, and when the condition is met and the synchronization state is obtained, it can exit from the spin process, otherwise it remains in the spin process (and blocks the node’s thread):

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) { // The current thread is trying to get the synchronization state in an "infinite loop", and only the precursor node is the head node can try to get the synchronization state
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}Copy the code

First, the head node is the node that has successfully obtained the synchronization state, and after the thread of the head node releases the synchronization state, it will wake up its successor node, and the thread of the successor node needs to check whether its precursor node is the head node.

Second, maintain the FIFO principle for synchronous queues. In this method, the behavior of node spin obtaining synchronization state is shown as follows:

Acquire method call flow:

The judgment condition that the precursor node is the head node and can obtain the synchronization state and the thread enters the waiting state is the spin process of obtaining the synchronization state. When the synchronization status is acquired successfully, the current thread returns from acquire(int arg), which in the case of concurrent components such as locks, means that the current thread acquired the lock.

After the current thread has acquired the synchronization state and executed the corresponding logic, it needs to release the synchronization state so that subsequent nodes can continue to acquire the synchronization state. Synchronization state can be released by calling the synchronizer’s release(int arg) method, which wakes up subsequent nodes after releasing the synchronization state (thus causing them to try again to get the synchronization state). The code for this method is shown in Listing 5-6.

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
             // Wake up a waiting thread from the queue (skip if CANCEL is encountered)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
Copy the code

When obtaining the synchronization state, the synchronizer maintains a synchronization queue. The thread that fails to obtain the synchronization state will be added to the queue and spin in the queue. The condition for moving out of the queue (or stopping the spin) is that the precursor node is the head node and synchronization status has been successfully acquired. When releasing the synchronization state, the synchronizer calls the tryRelease(int arg) method to release the synchronization state and then wakes up the successor nodes of the head node.

Fair lock and unfair lock implementation


ReentrantLock reentrantLock = new ReentrantLock(true); / / fair lock

static final class FairSync extends Sync {

        final void lock(a) {
            acquire(1);
        }
        
        
        public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }// Fair lock overrides the tryAcquire method
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false; }}Copy the code

Unfair lock (default implementation) :

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */
        final void lock(a) {
            if (compareAndSetState(0.1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            returnnonfairTryAcquire(acquires); }}protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        
    
    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

Copy the code

Implementation of read/write locks

Read/write locks allow multiple reader threads to access at the same time, but all reader threads and other writer threads are blocked when the writer thread accesses them. Read-write locks maintain a pair of locks, a read lock and a write lock. By separating the read lock and the write lock, concurrency is greatly improved compared to the general exclusive lock.

Read/write lock features:

Common methods:


// Returns the number of times the read lock was acquired, even if a thread acquired the read lock n times
    public int getReadLockCount(a) {
        return sync.getReadLockCount();
    }
// Returns the number of times the current thread acquired the read lock, using ThreadLocal
    public int getReadHoldCount(a) {
        return sync.getReadHoldCount();
    }
    
    // Determine whether the write lock is acquired
    public boolean isWriteLocked(a) {
        return sync.isWriteLocked();
    }
    
    public int getWriteHoldCount(a) {
        return sync.getWriteHoldCount();
    }
    
Copy the code

Design of read and write states:

The custom synchronizer of read/write lock needs to maintain the state of multiple reader threads and one writer thread on the synchronization state (an integer variable), making the design of the state become the key of the implementation of read/write lock. If multiple states are maintained on an integer variable, the variable must be bitwise sliced. Read/write locks split the variable into two parts, with 16 bits higher for read and 16 bits lower for write.

The current synchronization status indicates that a thread has acquired the write lock, re-entered it twice, and acquired the read lock twice in a row. How do read and write locks quickly determine read and write states? The answer is bitwise. Assume that the current synchronization status value is S, the write state is equal to S&0x0000FFFF(erase all 16 bits), and the read state is equal to S>>>16(unsigned complement 0 moved 16 bits right). When the write state increases by 1, it is equal to S+1. When the read state increases by 1, it is equal to S+(1<<16), that is, S+0x00010000.

Write lock acquisition and release:

A write lock is an exclusive lock that supports reentry. Increases the write state if the current thread has already acquired the write lock. The current thread enters the wait state if the read lock has already been acquired (the read status is not 0) when the previous thread is acquiring the write lock or if the thread is not the thread that has acquired the write lock

protected final boolean tryAcquire(int acquires) { 
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if(c ! =0) {
// There is a read lock or the current thread is not the thread that has acquired the write lock
                if (w == 0|| current ! = getExclusiveOwnerThread())return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                       throw new Error("Maximum lock count exceeded");
               setState(c + acquires);
               return true;
       }
       if(writerShouldBlock() || ! compareAndSetState(c, c + acquires)) {return false;
       }
       setExclusiveOwnerThread(current);
       return true;
}
Copy the code

If a read lock exists, the write lock cannot be acquired. The reason is that the read lock must ensure that the write lock operation is visible to the read lock. If the read lock is allowed to acquire the write lock while it has been acquired, then the other running reader thread will not be aware of the operation of the previous writer thread. Therefore, the write lock can only be acquired by the current thread until all other reader threads release the read lock. Once the write lock is acquired, all subsequent access by other reader threads will be blocked. The write lock release process is similar to that of ReentrantLock. Each release reduces the write status. When the write status is 0, the write lock is released, and the waiting reader thread can continue to access the lock

Lock down

Lock degradation refers to the degradation of a write lock to a read lock. If the current thread owns the write lock, then releases it, and finally acquires the read lock, this piecewise completion process is not called lock degradation. Lock degradation is the process of holding (currently owned) write locks, acquiring read locks, and then releasing (previously owned) write locks.

Is read lock acquisition necessary in lock degradation? Answers are necessary. In order to ensure the visibility of data, if the current thread does not acquire the read lock but directly releases the write lock, assuming that another thread (called thread T) acquires the write lock and changes the data, the current thread cannot sense the data update of thread T. If the current thread acquires a read lock, that is, following the steps of lock degradation, thread T will be blocked until the current thread uses the data and releases the read lock. RentrantReadWriteLock does not support lock upgrade (the process of holding read locks, obtaining write locks, and finally releasing read locks). The purpose is also to ensure data visibility. If the read lock has been acquired by multiple threads, and any thread successfully acquired the write lock and updated the data, its update is not visible to the other thread that acquired the read lock

LockSupport tools

LockSupport defines a set of methods starting with park to block the current Thread, and unpark(Thread Thread) to wake up a blocked Thread

Condition interfaces

Usage: The Condition object is generally used as a member variable. After calling the await() method, the current thread releases the lock and waits, while another thread calls the signal() method of the Condition object, notifying the current thread, which returns from the await() method and has acquired the lock before returning

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionUseCase {

    public Lock lock = new ReentrantLock();
    public Condition condition = lock.newCondition();

    public static void main(String[] args)  {
        ConditionUseCase useCase = new ConditionUseCase();
        ExecutorService executorService = Executors.newFixedThreadPool (2);
        executorService.execute(new Runnable() {
            @Override
            public void run(a) { useCase.conditionWait(); }}); executorService.execute(new Runnable() {
            @Override
            public void run(a) { useCase.conditionSignal(); }}); }public void conditionWait(a)  {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "Got the lock.");
            System.out.println(Thread.currentThread().getName() + "Waiting for signal");
            condition.await();
            System.out.println(Thread.currentThread().getName() + "Get the signal.");
        }catch (Exception e){

        }finally{ lock.unlock(); }}public void conditionSignal(a) {
        lock.lock();
        try {
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + "Got the lock.");
            condition.signal();
            System.out.println(Thread.currentThread().getName() + "Send out a signal");
        }catch (Exception e){

        }finally{ lock.unlock(); }}}Copy the code

Implementation method: wait queue

    The process of the //await() method is equivalent to synchronizing the first node of the queue (the node that has acquired the lock) to the Condition's wait queue
        public final void await(a) throws InterruptedException {
            // Check whether the current thread is interrupted
            if (Thread.interrupted())
                throw new InterruptedException();
            // The current thread joins the queue
            Node node = addConditionWaiter();
            // Release the synchronization state
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // Check whether this node is in the synchronization queue, if not, until the synchronization queue
            while(! isOnSyncQueue(node)) {// Block the current thread
                LockSupport.park(this);
                // Exit if the thread is interrupted
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
            // Contention synchronization state
            if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
                unlinkCancelledWaiters();
            if(interruptMode ! =0)
                reportInterruptAfterWait(interruptMode);
        }


Copy the code

If the thread is interrupted, an exception will be thrown; otherwise, addConditionWaiter() will be called to wrap the thread as a node and join the queue


        private Node addConditionWaiter(a) {
            // Get the tail node of the wait queue
            Node t = lastWaiter;
            // If the tail node is not in CONDITION, clear the node
            if(t ! =null&& t.waitStatus ! = Node.CONDITION) {// Clear all the nodes in the wait queue that are not in CONDITION
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // Wrap the current thread as a Node Node
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // Insert the tail node
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            // Set the node to the tail node
            lastWaiter = node;
            return node;
        }


// Adding a node to the wait queue does not use CAS, because the thread calling await() must be the thread that has acquired the lock, i.e., the lock is used to keep the thread safe, and fullyRelease() is called to release the synchronization state after successfully joining the wait queue

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // Get the synchronization status
            int savedState = getState();
            / / releases the lock
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw newIllegalMonitorStateException(); }}finally {
            if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code

Call AQS ‘template method release() to release the synchronization state and wake up any thread referenced by the successor node of the header in the synchronization queue. If the release succeeds, it returns normally, otherwise an exception is thrown. IsOnSyncQueue () is then called to determine whether the node is synchronizing the queue

Final Boolean isOnSyncQueue (Node to Node) {/ / if the state of CONDITION or precursor Node to null if (Node) waitStatus = = Node) CONDITION | | Node. The prev == null) return false; If (node.next! = null) // If has successor, it must be on queue return true; Return findNodeFromTail(node); }Copy the code

If the node is not in the synchronized queue, it will remain in the body of the while loop. When the thread is interrupted or the node associated with the thread is moved to the synchronized queue (i.e. the condition signal or signalAll method called by another thread), the loop will end by calling acquireQueued(). Otherwise the thread is blocked in the body of the loop through the locksupport.park () method

signal()/signalAll() signal()

Public final void signal() {// Check whether the current thread is the thread that acquires the lock. isHeldExclusively()) throw new IllegalMonitorStateException(); // wake up the first Node in the conditional queue. if (first ! = null) doSignal(first); }Copy the code

The copy code isHeldExclusively() method needs to be subclassed to determine whether the current thread is the one acquiring the lock

protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}
Copy the code

doSignal

signalAll()

    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        // 将等待队列中节点从头节点开始逐个移出等待队列,添加到同步队列
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }
Copy the code

The signalAll() method is equivalent to executing signal() once for each node in the wait queue, moving all nodes in the wait queue to the synchronization queue and waking up the threads of each node