Fair lock

  • The basic concept
    • AQS lock
    • CLH queue
    • CAS function
  • ReentrantLock
  • Get a fair lock
    • lock
    • acquire
      • tryAcquire
        • hasQueuedPredecessors
          • Node
        • compareAndSetState
        • setExclusiveOwnerThread
        • state
      • addWaiter
        • compareAndSetTail
      • acquireQueued
        • shouldParkAfterFailedAcquire
        • parkAndCheckInterrupt
        • tryAcquire
      • selfInterrupt
  • Release fair lock
    • unlock
    • release
      • tryRelease
      • unparkSuccessor
  • conclusion

The basic concept

AQS lock

  • AQS: AbstractQueuedSynchronizer

    • AQS is an abstract class in Java that manages locks and contains public methods for lock implementation
    • AQS is the common parent of both exclusive and shared locks
  • Categories of AQS locks:

    • Exclusive locks:

      • Locks can only be held by one thread at a time

      • According to the acquisition mechanism of lock, exclusive lock is divided into fair lock and unfair lock

        • Fair lock: The CLH waits for the thread queue to acquire the lock fairly on a first-come-first-served basis
        • Unfair lock: When a thread needs to acquire a lock, it ignores the CLH and waits for the thread queue to acquire the lock
      • Exclusive lock instances: ReentrantLock, ReentrantReadWriteLock, WriteLock

    • A Shared lock:

      • A lock that can be owned by multiple threads at one point in time and can be shared
      • Shared lock instances: ReadLock, CyclicBarrier, CountDownLatch, Semaphore

CLH queue

  • CLH: Craig,Ladin and Hagersten lock queue

    • CLH queues are queues of AQS waiting for locks

      • Locks protect competing resources from errors caused by multiple threads operating at the same time
      • In an exclusive lock, competing resources can only be accessed by one thread lock at a time, and the rest of the threads have to wait
      • The CLH queue is the thread queue that manages these waiting locks
    • CLH is a non-blocking FIFO queue. When a node is inserted or removed into the CLH wait queue, it will not be blocked under concurrent conditions, but atomicity of node insertion and removal is guaranteed by spin lock and CAS

CAS function

  • CAS: Compare And Swap

    • The comparison exchange function is a function of atomic operations. That is, the data that manipulates through CAS is atomic
    • Such as compareAndSetHead(), compareAndSetTail(), compareAndSetNext(). The common feature of these functions is that the actions performed are atomic

ReentrantLock

  • UML diagram for ReentrantLock:

  • ReentrantLock implements the Lock interface

  • ReentrantLock and Sync are combined:

    • Sync objects are included in ReentrantLock
    • Sync is a subclass of AQS
    • Sync has two subclasses, FairSync and NonFairSync
    • ReentrantLock is an exclusive lock, and whether it is a fair or unfair lock depends on whether the sync object is an instance of FairSync or NonFairSync

Get a fair lock

lock

  • Lock () is implemented in the FairSync class in ReentrantLock
final void lock(a) {
	acquire(1);
}
Copy the code
  • The current thread acquires the lock through acquire(1)

  • The value of 1 is the parameter of the lock status

    • For an exclusive lock, the status value is 0 when the lock is in the reachable state
    • For an exclusive lock, the status value is 1 when the lock is first acquired by a thread
  • Because ReentrantLock is a ReentrantLock, an exclusive lock can be acquired multiple times by a single thread, with the lock status value incremented by one for each acquisition

acquire

  • Acquire () is implemented in AQS:
public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
  • The current thread first attempts to acquire the lock through tryAcquire(), and returns if it succeeds; If an attempt to acquire the lock fails, it enters a wait queue, as there may be threads waiting for the lock

  • In the event of a failure, the current thread first adds the current thread to the end of the NON-blocking FIFO queue CLH queue via addWaiter(Node.exclusive)

  • After the addWaiter(Node.exclusive) method is executed, the lock is acquired through the aquiredQueued() method. Since ReentrantLock is a fair lock, the lock is acquired based on fairness

  • When the current thread executes acquireQueued(), it falls asleep in the CLH queue and waits until the lock is acquired

    • The acquireQueued() method returns true if the current thread has been interrupted while waiting for sleep
    • The current thread calls selfInterrupt() to generate an interrupt of its own

tryAcquire

  • TryAcquire () for fair locks is implemented in the FairSync class in ReentrantLock
protected final boolean tryAcquire(int acquires) {
			// Get the current thread
            final Thread current = Thread.currentThread();
            // Get the state of the exclusive lock
            int c = getState();
            if (c == 0) {
            	/* * If the lock is not held by any thread, it determines whether the current thread is the first thread in the CLH queue * - if so, it acquires the lock, sets its state, and sets the owner of the lock to the current thread */
                if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}// Update the lock status if the exclusive lock is already held by the current thread
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
}
Copy the code
  • TryAcquire () attempts to acquire the lock

    • Returns true on success

    • Return false on failure to get

      • The lock is then acquired in other ways

hasQueuedPredecessors

  • 24. Hasqueuedtoraise () realizes in AQS:
public final boolean hasQueuedPredecessors(a) {
		/* * Correctness depends on whether headers are initialized between headers and tails * if the current thread is the first thread in the queue then the result will be exact */
        // Read the fields in reverse initialization order
        Node t = tail; 
        Node h = head;
        Node s;
        returnh ! = t && ((s = h.next) ==null|| s.thread ! = Thread.currentThread()); }Copy the code
  • Hasqueued24 () returns to the existence of a thread in the AQS that waits longer than the current thread by determining whether the “current thread” is at the head of the CLH queue
Node
  • Node is the Node of the CLH queue
  • Node is implemented in AQS with the following data structure:
	* If the head of the queue already exists, the node's status waitStatus will not be CANCELLED */
    private transient volatile Node head;

    /** * At the end of the wait queue, delay initialization * can only add a new wait node */ by enq method
    private transient volatile Node tail;

	static final class Node {
        /** Specifies the id of a node waiting in shared mode */
        static final Node SHARED = new Node();
        /** Specifies the identity of a node waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** Indicates the value of waitStatus */ for which the thread has been cancelled
        static final int CANCELLED =  1;
        /** * The value of waitStatus in unpark that the successor of the current thread needs to be awakened * - the successor of the current thread is blocked * - and the successor of the current thread needs to be awakened when the current thread is released or cancelled */
        static final int SIGNAL    = -1;
        /** indicates that the thread in Condition sleep is waiting for the value of waitStatus */ for Condition to wake up
        static final int CONDITION = -2;
        /** * indicates that other threads have acquired the shared lock */
        static final int PROPAGATE = -3;

        /** * The status attribute waitStatus, which includes the following values: * SIGNAL: When a node's successor is or will be blocked, the current thread's successor needs to be woken up when the current thread is released or cancelled. To avoid contention, fetching methods first obtain the SINGAL state before attempting atomic fetching * CANCELLED: at timeout or thread cancellation. CONDITION: The current node is in a conditional queue and will not be the node in the synchronization queue until the state changes, at which point the state will be set to 0 * PROPAGATE: The shared lock should be propagated to the rest of the nodes, and the * 0 set in doReleaseShared: the current thread does not belong to any state * a non-negative value means that the node does not need to signal, so most code only needs to check for positive and negative symbols, not for the specific value */
        volatile int waitStatus;

        /** * last node */
        volatile Node prev;

        /** * next node */
        volatile Node next;

        /** * the thread corresponding to the node */
        volatile Thread thread;

        /** * nextWaiter specifies whether the current CLH queue is an EXCLUSIVE lock queue or a SHARED lock queue. * -nextwaiter = SHARED * /
        Node nextWaiter;

        /**
         * 如果是共享锁则返回true
         * 如果是独占锁则返回false
         */
        final boolean isShared(a) {
            return nextWaiter == SHARED;
        }

        /** * returns the previous node. If null, throw a null pointer exception * *@returnNode a Node */
        final Node predecessor(a) throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
	
		/** * constructor. The header node used for publishing initialization or the shared lock flag */
        Node() {    
        }

		/** * constructor. Used with addWaiter * *@paramThread The corresponding thread * of the thread node@paramMode is used to indicate whether the thread lock is an exclusive or shared lock */
        Node(Thread thread, Node mode) {     
            this.nextWaiter = mode;
            this.thread = thread;
        }

		/** * constructor. Used of Condition * *@paramThread The corresponding thread * of the thread node@paramWaitStatus waitStatus of the thread */
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread; }}Copy the code
  • Node: Node of the CLH wait queue, representing the queue of threads waiting for locks

    • Each Node has a thread

    • Each Node points to the previous Node via prev and the next Node via NEXT. Represents the previous waiting thread and the next waiting thread respectively

    • Node uses waitStatus to hold the wait state of the thread

    • Node uses nextWaiter to distinguish between exclusive and shared locks

      • The value of nextWaiter for an EXCLUSIVE lock thread is EXCLUSIVE
      • The value of nextWaiter for a SHARED lock thread is SHARED

compareAndSetState

  • CompareAndSetState () in AQS:
/** * Atomically sets the synchronization state to the given update value if the current state value is equal to the expected value. * This operation has the memory semantics of volatile reads **@paramExpect *@paramUpdate Specifies the update value *@returnBoolean Returns true if the update succeeds. If false, the current status value does not equal the given expected value */
protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);
}
Copy the code
  • CompareAndSetState operates atomically on the current thread: if the current thread’s state is equal to expect, it sets the current thread’s state to Update

setExclusiveOwnerThread

  • SetExclusiveOwnerThread () in the AbstractOwnableSynchronizer implementation:
 	/** * The current owner of the exclusive lock thread */
    private transient Thread exclusiveOwnerThread;

    /** * sets exclusive access for the currently owned thread * NULL indicates that no thread has exclusive access * otherwise the current thread will not add any synchronous or volatile access *@paramThread Specifies the currently owned threads */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
Copy the code
  • SetExclusiveOwnerThread sets the thread to the thread that currently has the exclusive lock

state

  • GetState () and setState() are both implemented in AQS:
	/** * Synchronization status */
    private volatile int state;

    /** * Gets the current value of the synchronization state * this operation has the memory semantics of volatile reading **@returnInt Current synchronization status */
    protected final int getState(a) {
        return state;
    }

    /** * Sets the value of synchronization status * this operation has the memory semantics of volatile writes **@paramNewState Specifies the new synchronization state */
    protected final void setState(int newState) {
        state = newState;
    }
Copy the code
  • State indicates the lock status:

    • For an exclusive lock,state=0 means that the lock is reachable, that is, it is not held by any thread
    • Exclusive locks in Java are reentrant, so state can be greater than 1

addWaiter

  • AddWaiter adds the current thread to the CLH queue, that is, to the queue of waiting threads waiting to acquire the lock

  • addWaiter(Node.EXCLUSIVE):

    • Creates the Node Node of the current thread
    • Log in Node that the current lock appears to be an exclusive lock type
    • Add the node to the end of the CLH queue
  • AddWaiter () implements in AQS:

	/** * Creates the current thread and nodes with the given schema and adds them to the CLH queue * Creates and enqueues nodes for current thread and given mode@paramMode node. EXCLUSIVE- EXCLUSIVE lock, node. shared-shared lock *@returnNode creates a new Node */
    private Node addWaiter(Node mode) {
    	// Create a new Node. The thread of the Node is the current thread. The lock model of the current thread is mode
        Node node = new Node(mode);

        for (;;) {
            Node oldTail = tail;
            // If the CLH queue is not empty, the node of the current thread is added to the end of the CLH queue
            if(oldTail ! =null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    returnnode; }}else {
            	// If the CLH queue is empty, the head queue is initialized and the current node is the head node of the CLH queueinitializeSyncQueue(); }}}Copy the code
  • For a fair lock,addWaiter(Node.exclusive) first creates a Node, sets the type of the Node to an EXCLUSIVE lock, and then adds the Node to the end of the CLH queue

compareAndSetTail

  • CompareAndSetTail implements in AQS:
	/** * compares and sets the tail node */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return TAIL.compareAndSet(this, expect, update);
    }
Copy the code
  • CompareAndSetTail belongs to the CAS function

  • CompareAndSetTail operates atomically:

    • Determine whether the end of the CLH queue equals Expect, and if so, set the end to the Update node

acquireQueued

  • AcquiredQueued () implements in AQS:
	/** * Get threads in queue in exclusive non-interrupt mode * for conditional waiting and get **@paramThe node node *@paramArg gets the argument *@returnBoolean returns true */ if interrupted while waiting
    final boolean acquireQueued(final Node node, int arg) {
    	/** * interrupted indicates whether the current thread has been interrupted */ during CLH queue scheduling while it is sleeping
        boolean interrupted = false;
        try {
            for (;;) {
            	// Get the last node. Because Node is the Node corresponding to the current thread, the last thread waiting for the lock is acquired here
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    return interrupted;
                }
                if(shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); }}catch (Throwable t) {
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throwt; }}Copy the code
  • AcquireQueued acquires the lock from the queue

shouldParkAfterFailedAcquire

  • ShouldParkAfterFailedAcquire () implementation in AQS:
	/** * Checks and updates the status of the node when it fails to acquire * Returns true if the current thread should block, which is the main signal control in the acquire loop. Requires that the argument pred be equal to node.prev * returns whether the current thread should block * *@paramA node on pred that holds state *@paramNode Current node *@returnBoolean Returns true */ if the current thread should block
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // The status of the successor node
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // If the state of the successor node is SIGNAL, the current thread is blocked and needs to be woken up using unpark
            return true;
        if (ws > 0) {
            // If the state of the predecessor node is cancelled, set the predecessor node of the current node to the predecessor node of the original predecessor node
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // If the predecessor is in 0(not in any state) or shares the PROPAGATE state, then the predecessor is set to the blocking state SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
Copy the code
  • Wait status waiStatus:

    • CANCELLED – 1: Current thread has been CANCELLED
    • SIGNAL – -1: The successor of the current thread needs to be woken up by unpark. If the successor thread of the current thread is blocked and the current thread releases or cancels, the successor thread of the current thread needs to be woken up
    • CONDITION – -2: The current thread is in CONDITION sleep and needs to wait for CONDITION to wake up
    • PROPAGATE — -3: Shared lock state. Other threads can acquire the shared lock
    • 0: The current thread does not belong to any of the above states
  • ShouldParkAfterFailedAcquire () by the following rules to judge whether the current thread should block:

    • If the status of the successor node is SIGNAL, the current thread needs to be woken up by unpark. Return true if the current thread is blocked
    • If the state of the predecessor node is CANCELLED, it indicates that the predecessor node has been CANCELLED. Therefore, a valid non-cancelled node value (value less than or equal to 0) should be set to the status value of the predecessor node through backtracking, and false should be returned
    • If the status value of the predecessor is 0 or PROPAGATE. That is, the predecessor is not SIGNAL or CANCELLED, then set the predecessor to SIGNAL and return false
  • ShouldParkAfterFailedAcquire () to determine the current thread should be blocked, will call parkAndCheckInterrupt () block the current thread

parkAndCheckInterrupt

  • ParkAndCheckInterrupt () is implemented in AQS:
	/** * blocks the current thread and returns the thread's interrupted status **@returnWhether the thread is interrupted */
    private final boolean parkAndCheckInterrupt(a) {
    	// Block the current thread with the park() method in LockSupoort
        LockSupport.park(this);
        // Returns the state of the thread after the interrupt is awakened
        return Thread.interrupted();
    }
Copy the code
  • There are two ways to wake up after a thread is blocked:

    • Unpark wake up: Wake up the current thread by unpark() after the thread corresponding to the preceding node has finished using the lock
    • Wake up: The remaining threads interrupt the current thread with interrupt()
  • Park () and unpark() in LockSupport are similar to wait() and notify() in Object. Both block and wake up the thread

    • Park () and unpark() are lightweight
    • Wait () and notify() must first acquire a synchronization lock through Synchronized

tryAcquire

  • In the acquireQueue() for loop:
// Get the successor node of node
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
	setHead(node);
	p.next = null; 
	failed = false;
	return interrupted;
}
Copy the code
  • p == head && tryAcquire(arg) :

    • Determine if the predecessor node is the head of the CLH queue, and if so, try to acquire the lock through tryAcquire()

    • The reason for using the p == head condition is to ensure fairness in the lock acquisition of the current thread:

      • First, in shouldParkAfterFailedAcquire () to judge whether the current thread should block
      • ParkAndCheckInterrupt () is then called to block the current thread when it should. When a thread is unblocked, the interrupted status of the thread is returned. A thread unblocks either because other threads call the current thread’s unpark() function, or because the thread interrupts
      • If the current thread is awakened by another thread calling unpark(), it should be awakened by the thread corresponding to the preceding node, according to the process of releasing the lock. That is, it must be because the preceding node calls unpark() to wake up the current thread
      • P == head: The successor node of the current node is the head node of the CLH queue. After the current successor node releases the lock, the current node starts to acquire the lock. If tryAcquire() is successful, setHead(node) will set the current node as the head node and return
      • Conclusion: If the preceding node calls unpark() to wake up the current thread and the preceding node is the CLH table head, p == head is satisfied, which is fair. Otherwise, it would be unfair for the current thread to wake up because of a thread interrupt
  • AcquireQueued: The current thread blocks and waits on fairness until the lock is acquired. And returns whether the current thread has been interrupted while waiting

selfInterrupt

  • SelfInterrupt () is implemented in AQS:
	/** * the current thread in the middle */
    static void selfInterrupt(a) {
        Thread.currentThread().interrupt();
    }
Copy the code
  • SelfInterrupt () is the interrupt generated by the current thread itself. In acquireQueued(), selfInterrupt() is executed if the current thread has been interrupted, and not otherwise

    • In acquireQueued(), a thread acquires execution rights from the CPU when it is awakened by an interrupt in a blocked state. If there are other threads waiting for the lock in front of the thread, the thread is still not allowed to acquire the lock due to fairness. The thread blocks again until it is awakened by the previous thread lock, at which point the thread acquires the lock and actually executes
    • The thread’s interrupt is ignored and the interrupt flag is cleared until the thread successfully acquires the lock and actually executes. Because in parkAndCheckInterrupt(), the Thread calls Thread.interrupt() to interrupt the Thread to return state. This function not only returns the current interrupt status, but also clears the interrupt identifier. Because the previous interrupt identifier is cleared, you need to call selfInterrupt() to regenerate an interrupt
  • Acquire fair lock acquire():

    • First try to acquire the lock through tryAcquire(), and return if successful. If this fails, acquire the lock again by acquireQueued()
    • If acquireQueued() fails and you need to call acquireQueued(), the current thread is first added to the end of the CLH queue by the addWaiter() method. Then the acquireQueued() method is called and sorted in the CLH queue waiting for the lock to be acquired. The thread is dormant and will not return until the lock is acquired. If an interrupt occurs during a sleep wait, selfInterrupt() is then called to generate an interrupt of its own

Release fair lock

unlock

  • Unlock () is implemented in ReentrantLock:
public void unlock(a) {
	sync.release(1);
}
Copy the code
  • Unlock () is a lock release function, which is implemented through the release() function in AQS

  • The value of 1 is the parameter of the lock status

    • For an exclusive lock, the status value is 0 when the lock is in the reachable state
    • For an exclusive lock, the status value is 1 when the lock is first acquired by a thread
  • Because fair locks are reentrant locks, each time the lock is released for the same thread, the lock state decreases by one

release

  • Release () implemented in AQS:
	public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
Copy the code
  • TryRelease () is first called in release() to try to release the lock held by the current thread. On success, wake up subsequent waiting threads and return true. On failure, return false

tryRelease

  • TryRelease () is implemented in the Sync class of the ReentrantLock class:
		protected final boolean tryRelease(int releases) {
			// Define the state after the lock is released
            int c = getState() - releases;
            // If the current thread is not the lock holder, an exception is thrown
            if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
            	// If the current lock has been released by the current thread, set the lock holder to NULL. The lock is now available
                free = true;
                setExclusiveOwnerThread(null);
            }
            // Set the lock state of the current thread after the lock is released
            setState(c);
            return free;
        }
Copy the code
  • TryRelease () attempts to release the current lock

    • If the current thread is not the lock holder, an exception is thrown
    • After the current thread releases the lock, the current thread’s ownership status of the lock is 0, and the current thread releases the lock completely. Set the owner of the lock to NULL, which is the reachable state of the lock, and update the status of the current thread to 0

unparkSuccessor

  • After the current thread successfully releases the lock in release(), its successors wake up
  • According to the first-in, first-out FIFO principle of CLH queue, the current thread must be the head node. If the CLH queue is not empty, the next waiting thread will be woken up
	private void unparkSuccessor(Node node) {
        // Get the state of the current thread
        int ws = node.waitStatus;
        if (ws < 0)
        	// If the status is negative, set the status value to 0
            compareAndSetWaitStatus(node, ws, 0);

        /* * Gets the valid successor of the current node * if it is invalid, use the for loop to start from the end * where valid refers to the corresponding state of the successor node */
        
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        // Wake up the thread corresponding to the subsequent node
        if(s ! =null)
            LockSupport.unpark(s.thread);
    }
Copy the code
  • The function of the unparksucceeded () is to wake up the successors of the current thread
  • After the subsequent thread wakes up, it can continue to acquire the lock and resume running

conclusion

  • The process of releasing the lock is to update the status of the lock corresponding to the current thread

    • If the lock on the current thread has been released, the thread holding the lock is set to NULL
    • Then set the state of the current thread to null
    • Then wake up the subsequent threads