📚

When there is a higher mountain to climb in your heart, you will not care about the mire at your feet

🐉 Introduction

  1. This chapter explains CAS, essentially machine instructions: CMPXCHG + LOCK (based on the number of processor cores) atomic operations;

  2. When it comes to concurrent operations, we have to talk about AQS, JDK source code for a lot of concurrent classes are through Sync (synchronizer) internal class inherited AQS to achieve a variety of functions;

🐉 A brief introduction to AQS

🐉 What is AQS?

  1. AQS is an abstract class, the class name AbstractQueuedSynchronizer, abstract method are some common properties, its own is not implemented any synchronization interfaces;

  2. AQS defines lock acquisition and lock release in synchronizer for use or overwriting by custom synchronizer components.

  3. If you look at the subclasses of AQS, most of them are a static inner class called Sync that inherits the AQS class and implements custom synchronizers by overriding some methods in the AQS.

  4. AQS defines two resource sharing modes: EXCLUSIVE(only one Thread can execute a resource at a time) and SHARED(multiple threads can execute a resource at the same time).

  5. AQS maintains a FIFO CLH linked list queue, and this queue does not support priority-based synchronization strategy.

🐉 AQS state keyword

  1. Maintains a volatile int state field:private volatile int state , this field is the core keyword of AQS;

  2. Obtain and set the update state value through getState, setState, compareAndSetState method classes.

  3. This field acts as a bond between different concurrent classes. We’ll continue to look at some of the scenarios where the state field is used.

The waitStatus keyword for 🐉 Node

  1. The normal default status value is 0;

  2. For the release operation, the previous node has the task of waking up the next node. (Essentially the waitStatus value of the previous node after node rotation)

  3. The leading node of the current node waitStatus > 0, then the node is CANCELLED and should be kicked out of the queue.

  4. If waitStatus of the current node is 0, change the status of the current node to SIGNAL.

  5. If the current node’s front-node waitStatus = -1, change the front-node to wait state.

  6. The front-node of the current node waitStatus = -2, then the front-node needs to be changed to condition state.

🐉 CLH queue

🐉 queue model

      +------+  prev +------+  prev +------+
      |      | <---- |      | <---- |      |  
 head | Node |  next | Node |  next | Node |  tail
      |      | ----> |      | ----> |      |  
      +------+       +------+       +------+

Copy the code

🐉 linked list structure

🐉 head node

In header and tail nodes, it is important to note that the header is an empty object node: Here it can be to understand that the head node is not involved in the queue, because it has won the sync, then explain the related thread head node has been in the implementation of the corresponding business logic, and the execution of the business logic, release the synchronization, the head node is bound to be garbage collected, prevent the waste of memory space, If the object has references, the garbage collector will not collect it. Therefore, all references held by the header node need to be null. Thread is also one of the references held by the header node, so the Thread object needs to be null.

🐉 two-way linked list

Each Node Node maintains a pointer to the front drive and a pointer to the rear drive. The nodes are associated with each other to form a bidirectional linked list.

🐉 Traversal notification mechanism

After leaving the team, it is necessary to activate the successor node of the exit node. If the successor node is empty or waitStatus>0, it will start from the end of the team

The preceding traversal triggers a blocking wake up with waitStatus<0;

🐉 state simple application example of AQS

  1. CountDownLatch (CountDownLatch) CountDownLatch (CountDownLatch) CountDownLatch (CountDownLatch) CountDownLatch (CountDownLatch)

    • State is initialized to assume N, and every subsequent countDown() reduces the CAS value by 1.

    • After all child threads have finished executing (i.e., state=0), unpark() the calling thread, and then the calling thread returns from the await() function to continue the residual action.

  2. A CyclicBarrier is A group of threads that wait for another group of threads in group B to complete their execution before group A can execute.

    • State initialization is assumed to be N, and every subsequent await() state will subtract CAS by 1.

    • After all child threads have finished executing (i.e., state=0), unpark() the calling thread, and then the calling thread returns from the await() function to continue the residual action.

  3. ReentrantLock: a class of exclusive locks;

    • State is initialized to 0, indicating an unlocked state, and then tryAcquire() is called with each lock() to increase state by one.

    • Another thread will fail to tryAcquire() until the unlock() of thread A reaches state=0.

  4. Semaphore: Threads A, B, C, and D compete for resources at the same time. Currently, the card slot size is 2. If threads A and B are executing but not finishing, threads C and D wait outside the door.

    • The initial value of state is assumed to be N. For each subsequent tryAcquire() attempt, state will decrease CAS by 1. When state is 0, other threads are in wait state.

    • Until state>0 and

🐉 commonly used important methods


protected boolean isHeldExclusively(a)

Copy the code

Methods that need to be implemented by subclasses, and whether the thread calling the method holds an exclusive lock, are typically implemented when condition is used


protected boolean tryAcquire(int arg)

Copy the code

Methods that need to be implemented by subclasses, exclusive attempts to acquire the lock, return true on success, false on failure


protected boolean tryRelease(int arg)  

Copy the code

Exclusive attempts to release the lock and returns true on success or false on failure


protected int tryAcquireShared(int arg)  

Copy the code

Methods that need to be implemented by subclasses. The shared method attempts to acquire the lock and returns a positive number of 1 on success and a negative number of -1 on failure


protected boolean tryReleaseShared(int arg)   

Copy the code

Methods that need to be implemented by subclasses. The shared method attempts to release the lock and returns true on success or false on failure


final boolean acquireQueued(final Node node, int arg)

Copy the code

For nodes that enter the end of the queue, the detection itself can rest, and if it can be modified it enters SIGNAL and park() blocked


private Node addWaiter(Node mode)

Copy the code

Add nodes to the end of the list


private Node enq(final Node node)

Copy the code

If the addWaiter attempt fails, enq is called again to spin the node to the end of the queue


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)

Copy the code

Check node status and if you can rest, set waitStatus=SIGNAL and call lockSupport. park to rest.


private void unparkSuccessor(Node node)   


Copy the code

This method is responsible for waking up subsequent nodes when the lock is released

🐉 Design and implementation of pseudocode

🐉 Obtain an exclusive lock:


    public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } acquire{if an attempt to acquire an exclusive lock fails (methods of attempting to acquire an exclusive lock are implemented by subclasses of AQS), then a new exclusive lock is added to the queue via spin operation. Call locksupport. park to rest based on waitStatus}Copy the code

🐉 Release an exclusive lock:


    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
        return false; } release{if an attempt to release an exclusive lock is successful (various ways of trying to release an exclusive lock are implemented by subclasses of AQS), then fetch the head node and determine whether it is obliged to wake up its successors based on waitStatus}Copy the code

🐉 Obtain the shared lock:


    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } acquireShared{If an attempt to obtain a shared lock fails (the various ways of trying to obtain a shared lock are implemented by subclasses of AQS), then the new shared lock is added to the end of the queue by spinning. Call locksupport. park to rest based on waitStatus}Copy the code

🐉 Release the shared lock:


    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false; } releaseShared{If an attempt to release a shared lock fails (various ways of attempting to release a shared lock are implemented by subclasses of AQS), then the blocking thread is invoked using the spin call}Copy the code

🐉 source code analysis ReentrantLock

🐉 ReentrantLock constructor

🐉 constructor source code

 
    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock(a) {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
	
Copy the code

🐉 The default constructor is an unfair lock

The parametric constructor also determines whether the caller is using a fair or unfair lock by passing in a variable;

🐉 Sync synchronizer

AQS - > Sync - > FairSync / / fair lock | | > NonfairSync / / not fair lockCopy the code

The synchronizers in ReentrantLock operate on call relationships through the Sync abstract interface, and if you look closely, you’ll see that most of them operate through calls like sync.xxx.

🐉 lock source

public void lock() { sync.lock(); } // FairSync final void lock() {acquire(1); } // NonfairSync final void lock() {// If the state resource is 0, no thread is currently holding the lock. If (compareAndSetState(0, SetExclusiveOwnerThread (thread.currentThread ()); setExclusiveOwnerThread(); Else // If CAS fails to acquire the lock, try acquire the exclusive lock (1); }Copy the code

Here is the difference between a fair lock first tested whether by CAS when call lock locks, lock once found empty words, the first step to possess oneself of, regardless of the blocking queue, as long as the current thread to find the state resources has not been occupied the current thread to try the first CAS, the CAS failed it didn’t go to line up;

🐉 acquire (int)

  • This method is the entry of the thread in exclusive mode to obtain the state shared resource. If the thread obtains the state shared resource, it returns. Otherwise, it creates the exclusive mode node and joins the blocking queue until the state shared resource is obtained.

  • In addition, self-interrupt judgment needs to be added here, mainly because if the thread is interrupted during the waiting process, it will not respond, so it has to wait until the thread obtains resources through its own judgment to make up the judgment.

  • The method in exclusive mode, under normal circumstances, as long as the lock is not obtained, the method has been in the blocking state, get out of the method area;

    public final void acquire(int arg) {
        if(! tryAcquire(arg) &&This method is implemented by a concrete subclass of AQS
			// If the resource fails to be retrieved, the queue is entered
 	       acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
           selfInterrupt();
    }
	
Copy the code

🐉 tryAcquire (int)

	// FairSync is a tryAcquire method for a fair lock
	protected final boolean tryAcquire(int acquires) {
		final Thread current = Thread.currentThread();
		// Get the latest memory value of the lock resource
		int c = getState(); 
		// When state=0, the lock resource is not currently occupied by any thread
		if (c == 0) { 
			// Check whether the thread has a blocking queue
			if(! hasQueuedPredecessors() &&// If no queue is blocked, the CAS operation obtains the lock resource
				compareAndSetState(0, acquires)) {
				// If no queue is blocked and CAS successfully acquires the lock resource, the exclusive thread object is set to the current thread
				setExclusiveOwnerThread(current);
				Return true; // The thread has acquired the lock resource.}}// If the lock resource value is not 0, another thread is occupying the lock resource
		else if (current == getExclusiveOwnerThread()) { 
			// Now that the lock is occupied, see if the thread holding the lock is the current thread
			int nextc = c + acquires; 
			// If the thread holding the lock is the current thread, then the relock concept, the state value is incremented by 1
			// Int is less than 0 because the state of the int overflows,
			// If the lock overflows, it is difficult to get the lock
			if (nextc < 0)
				throw new Error("Maximum lock count exceeded");
			setState(nextc);
			// Returns a success flag telling the upper layer that the thread has acquired the lock resource
			return true; 
		}
		// Returns a failure flag telling the upper layer that the thread did not acquire the lock resource
		return false; 
	}

Copy the code

🐉 non-fair lock tryAcquire source:


	// NonfairSync tryAcquire method for an unfair lock
	protected final boolean tryAcquire(int acquires) {
		// Call the parent's unfair lock resource method
		return nonfairTryAcquire(acquires); 
	}	

	// NonfairSync nonfairTryAcquire method of Sync class
	final boolean nonfairTryAcquire(int acquires) {
		final Thread current = Thread.currentThread();
		// Get the latest memory value of the lock resource
		int c = getState(); 
		// When state=0, the lock resource is not currently occupied by any thread
		if (c == 0) { 
			// Try to obtain the lock resource through CAS
			if (compareAndSetState(0, acquires)) { 
			// Once CAS successfully acquires the lock resource, it sets the exclusive thread object to the current thread
				setExclusiveOwnerThread(current); 
				// Returns a success flag telling the upper layer that the thread has acquired the lock resource
				return true; }}// If the lock resource value is not 0, another thread is occupying the lock resource
		// Now that the lock is occupied, see if the thread holding the lock is the current thread
		else if (current == getExclusiveOwnerThread()) { 
			// If the thread holding the lock is the current thread, then the relock concept, the state value is incremented by 1
			int nextc = c + acquires; 			
			// The value of int is less than 0 because the state of the int overflows. If the state of int overflows, the lock is difficult to obtain
			if (nextc < 0) // overflow
				throw new Error("Maximum lock count exceeded");
			setState(nextc); // 
			return true; // Returns a success flag telling the upper layer that the thread has acquired the lock resource
		}
		return false; // Returns a failure flag telling the upper layer that the thread did not acquire the lock resource
	}	

Copy the code

The tryAcquire method is implemented by a subclass of AQS, that is, the two static inner classes of ReentrantLock. The purpose of the tryAcquire method is to try to obtain the lock resource through CAS.

🐉 addWaiter (Node)


    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
		// Create new nodes in the given mode, including node. EXCLUSIVE and Node.SHARED.
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail; // Assign the end of the queue to a temporary variable
        if(pred ! =null) { // If pred is not empty, the queue already has nodes
            node.prev = pred;
            if (compareAndSetTail(pred, node)) { // CAS attempts to set node to the end of the team
                pred.next = node;
                returnnode; }}// Set the head node and add the new node to the end of the queue
        enq(node); // Enter spin to add node
        return node;
    }
	
Copy the code

AddWaiter creates a new node to try to join the end of the queue by passing in a different mode, and if adding a node to the end of the queue fails due to concurrency then spins to add the node to the end of the queue;

🐉 enq (Node)

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) { 
			// Spin is an infinite loop
            Node t = tail;
			// The tail queue must be empty for the first time because of the spin mode, but will not be empty for subsequent lists with data
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node())) 
				    // if the queue is empty, create an empty object node as the head node
					// If the queue is empty, both ends point to the same object
                    tail = head; 
            } else {
				// Enter the else method to indicate that the list queue already has nodes
                node.prev = t;
				// CAS attempts to set the newly added node to the end of the queue because of concurrent operations
                if (compareAndSetTail(t, node)) { 
					// If the node succeeds in setting the end of the team,
					// Set the successor of the old object t to node and the precursor of node to t
                    t.next = node;
                    returnt; }}// If either of the above CAS operations fails, the method will not be abandoned because it is a spin operation, and the loop continues to join the queue}}Copy the code

Enq ensures that the node is correctly added to the tail of the queue by spinning in an infinite loop. If the head is empty, the puppet null node is added through CAS, and then the tail node of the queue is added in the loop.

🐉 compareAndSetHead/compareAndSetTail

    /** * CAS head field. Used only by enq. */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
	
    /** * CAS tail field. Used only by enq. */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }	

Copy the code

CAS operation, set the head node, tail node;

🐉 acquireQueued (Node, int)

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) { 
             // Spin is an infinite loop
				// If the new node is preceded by a header
                final Node p = node.predecessor(); 
				// If the first node is the head node, then this node is the second, next to the first node, and also wants to try to acquire the lock.
				// How many nodes have just been released? There is still hope, in case it comes true...
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC, where the header is empty
					// After the lock resource is obtained, the node is upgraded as the head node and the pointer of the successor node is set to null for GC collection
                    failed = false;
                    return interrupted;
                }
				// See if you need to rest according to the precursor
				// Block operations. Normally, no lock is acquired, and the code stops in this method until it is woken up
                if (shouldParkAfterFailedAcquire(p, node) && 
                    parkAndCheckInterrupt()) 
                    interrupted = true;
				// If this is the case, the attempt to rest failed, because it is a spin operation, so it will continue to loop}}finally {
            if(failed) cancelAcquire(node); }}Copy the code

AcquireQueued also operates in a spinning loop where only the header can attempt to acquire the lock and the rest of the nodes stand there one by one waiting to be modified, woken up, or become headers. New nodes are not immune to the same fate. First check to see if they have a header and then check to see if they can rest.

🐉 shouldParkAfterFailedAcquire (Node, the Node)

/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean ShouldParkAfterFailedAcquire (Node Mr Pred, Node Node) {/ / obtain the precursor Node status value int ws = Mr Pred. WaitStatus; // If the state of the precursor node is SIGNAL, then this node should not be concerned. If (ws == node. SIGNAL) /* * This Node has already set status asking a release * to SIGNAL it, so it can safely park. */ return true; If (ws > 0) {* * Predecessor was cancelled. Skip over installations and * indicate retry. */ / 'do {node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, Caller will need to * retry to make sure it cannot acquire before parking. */ / CompareAndSetWaitStatus (pred, ws, Node.signal); } return false; }Copy the code

ShouldParkAfterFailedAcquire mainly detect precursors node status, the precursor node for the SIGNAL, then a new node can relax and rest;

  • If the precursor node is greater than zero, it means that the precursor node is CANCELLED. Take the preD precursor as the starting point and search until the last node in the normal waiting state is found.

  • If the precursor is less than zero, set the precursor to SIGNAL and return false depending on the acquireQueued spin to determine if rest is required again.

🐉 parkAndCheckInterrupt ()

    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt(a) {
        LockSupport.park(this); // block wait
        return Thread.interrupted(); // Check whether there is any interruption after waking up.
    }


Copy the code
  • ParkAndCheckInterrupt first calls park to put the thread into a wait state. When the Park block is woken up, it checks again to see if it has been interrupted.

  • There are two ways to be woken up: one is with unpark and the other is with interrupt.

🐉 unlock ()


    public void unlock(a) {
        sync.release(1);
    }

Copy the code

Unlock releases a lock resource. It is usually called in finally to prevent the lock from being released if a critical section fails because of any exception. And release lock lock not like access to the realization of multicolor diversity, there is no so-called fair or unfair, is the release of the resources of the rules and regulations;

🐉 release (int)


    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) { // Try to release the lock resource. This method is implemented by a concrete subclass of AQS
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0) // Wake up the successor node from the beginning node
                unparkSuccessor(h); // Kick out the CANCELLED node and wake up its successor
            return true;
        }
        return false;
    }

Copy the code

Release attempts to release the lock, and has the obligation to remove CANCELLED nodes and wake up their successors to continue running and obtain lock resources.

🐉 tryRelease (int)

// NonfairSync and FairSync's parent class Sync protected final Boolean tryRelease(int Releases) {int c = getState() - releases; If (thread.currentThread ()! If (getExclusiveOwnerThread() = getExclusiveOwnerThread()) = getExclusiveOwnerThread(); So throw new exception IllegalMonitorStateException (); boolean free = false; If (c == 0) {// If (c == 0) {// If (c == 0) {free = true; setExclusiveOwnerThread(null); } setState(c); // Return free if the subtraction operation does not return to zero; }Copy the code

TryRelease is used to subtract 1 from the state lock resource by CAS.

🐉 unparkSuccessor (Node)

/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed Int ws = node.waitStatus; int ws = node.waitStatus; int ws = node.waitStatus; If (ws < 0) // if (ws < 0) // if (ws < 0) // if (ws < 0) // if (ws < 0) // if (ws < 0) // if (ws < 0) // if (ws < 0) // if (ws < 0) // if (ws < 0) // if (ws < 0) // if (ws < 0) // if (ws < 0) // if (ws < 0) // /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; / / remove the subsequent node, this time is usually the back of the Head of a node, so are generally penis if (s = = null | | s. aitStatus > 0) {/ / if the subsequent nodes is empty or subsequent is already CANCELLED status word s = null; For (Node t = tail; for (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s ! = null) LockSupport.unpark(s.thread); // Wake up the thread}Copy the code

The unparksucceeded is mainly to kick out CANCELLED nodes and wake up their successors; However, if the wake up successor is empty, then loop forward from the end of the queue to find the node whose state is less than or equal to zero and call unpark to wake up.

Why are we looking from the back of the line?

  • In this queue, any node may be interrupted. It is possible, but it does not mean absolute, but it is certain that the interrupted node will have its status set as CANCELLED, indicating that the node will be kicked out at some point in the future. (Will not be removed immediately)
  • The rule for kicking out of the queue is simple: the node’s predecessor will not point to it, but will point to a non-cancelled node after it.
    • And this node that’s going to be kicked out, its next pointer is going to point to itself;
    • If we look back at head and discover a node that was CANCELLED, the “for” loop is dead;
    • But of all these nodes, their prev precursors have not been touched, so it is safest to traverse through the tail node.

🐉 summary

  1. After analyzing so much, I feel whether there is a feeling of sudden enlightenment. The miraculous AQS passed by everyone is not so difficult to understand as imagined;

  2. Here I briefly summarize some features of AQS process:

    • Key lock acquisition and lock release operations are implemented by AQS subclasses: acquire-Release and Acquireshared-releaseshared;
    • A FIFO linked list structure queue is maintained, and new nodes are added to the end of the queue by means of spin.
    • Nodes will be added by iterating forward from their predecessor, skipping those CANCELLED;
    • When the node is released, it will first judge whether the node after the head node is in cancel state or NULL, which is NULL: it will walk forward from the end of the queue, kick out the node with CANCELLED state, and then wake up the node corresponding to the first node with non-cancel state; otherwise, it will directly wake up the head node and subsequent node. If the waitStatus of the header is <0, of course.
  3. In fact, when understanding AQS, here with ReentrantLock as the carrier analysis, then to analyze CountDownLatch, Semaphore, ReentrantReadWriteLock and other modules that integrate AQS and realize different functions will be a lot smoother;