A:

ReentrantLock is a locking mechanism provided in the java.util.Concurrent package. ReentrantLock is a reentrant, mutually exclusive lock that supports both fair and unfair implementations. Based on java8, this article analyzes the implementation principle of concurrency tool ReentrantLock.

ReentrantLock class diagram

Three: process diagram

Four: source code analysis

We take lock() method and unlock() method as the entrance to ReentrantLock source analysis.

Lock ()

ReentrantLock creates different Sync objects based on the fair argument passed by the constructor when the constructor creates an object (default is an implementation of an unfair lock). Reentrantlock. lock() calls sync.lock(). The Lock () method in the Sync class is an abstract method implemented in FairSync(fair) and NonfairSync(unfair), respectively.

    public ReentrantLock(a) {
        // Default is unfair lock
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
Copy the code
    public void lock(a) {
        sync.lock();
    }
Copy the code

Unfair realization:


       final void lock(a) {
            // state indicates the number of lock reentries. 0 indicates that there is no lock
            // Try to preempt the lock once. Replace the state flag with CAS to indicate that the lock is successfully preempt
            if (compareAndSetState(0.1))
		// Preemption success sets the thread that preempted the lock to the current thread
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
Copy the code

Fair lock implementation:

	final void lock(a) {
            acquire(1);
        }
Copy the code

It can be seen that an unfair lock will attempt to preempt the lock once at the start of the lock() method, i.e. cut the queue once at this point. The acquire() method is then called. The acquire() method calls three methods, tryAcquire(), addWaiter(), and acquireQueued(), which are the core methods of ReentrantLock. We will focus on these three methods next.

public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))AcquireQueued () returns a thread interrupt flag step by step. If true, the thread has been interrupted and an interrupt flag is set to the client
	// Since locksupport.park () does not respond to interrupts after that, the interrupt marker needs to be passed back step by step
            selfInterrupt();
        }
Copy the code

TryAcquire () method

Flow chart:

The tryAcquire() method is a lock preemption method. If the tryAcquire() method is a lock preemption method, return ture to indicate that the thread preemption the lock, if the lock is preemption, do nothing, directly execute the synchronization code, if the thread is not preemption, store the thread information, and block the thread. That is, call the addWaiter() and acquireQueued() methods.

TryAcquire () also has both fair and unfair lock implementations.

Source code analysis:

Fair lock implementation:


	protected final boolean tryAcquire(int acquires) {
	    // Get the current thread
            final Thread current = Thread.currentThread();
	    // Get the value of state
            int c = getState();
            if (c == 0) {
	    // If state is 0, the lock can be preempted
	    // Only if there is no element in the AQS list, try to preempt the lock. Otherwise, queue the list
                if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
		// If the cas is replaced successfully, the lock is preempted successfully
                    setExclusiveOwnerThread(current);
                    return true; }}// Determine if the thread that acquired the lock is the current thread. If so, increase the reentrant count (i.e. increase the value of state).
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false; }}Copy the code

Unfair lock implementation:

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
    }
Copy the code
	final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
		// If state is 0, the lock is now unlocked and not fair
		// If there is a thread in the AQS list that is already waiting for the lock, it will not attempt to preempt the lock.
                if (compareAndSetState(0, acquires)) {	
		// If the cas is replaced successfully, the lock is preempted successfully
                    setExclusiveOwnerThread(current);
                    return true; }}// Determine if the thread that acquired the lock is the current thread. If so, increase the reentrant count (i.e. increase the value of state).
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // Reset the value of state
                setState(nextc);
                return true;
            }
            return false;
        }
Copy the code

Next, analyze the addWaiter() method

addWaiter()

AddWaiter role () method is the threads will not have access to lock in a Node object, and then stored in the queue AbstractQueuedSynchronizer synchronizer (to lazy AQS). Let’s first look at the structure of the Node object.

    static final class Node {
    
        static final Node SHARED = new Node();
 
        static final Node EXCLUSIVE = null;
        
        static final int CANCELLED =  1;

        static final int SIGNAL    = -1;

        static final int CONDITION = -2;
   
        static final int PROPAGATE = -3;

       
        volatile int waitStatus;


        volatile Node prev;


        volatile Node next;


        volatile Thread thread;


        Node nextWaiter;


        final boolean isShared(a) {
            return nextWaiter == SHARED;
        }


        final Node predecessor(a) throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread; }}Copy the code

According to the structure of Node object, we can see that it is a bidirectional linked list structure, which stores prev and next references, thread member variables are used to store blocked thread references, and Node is stateful, which are CANCELLED, SIGNAL, CONDITION, PROPAGATE, respectively. The states involved in ReentrantLock are “SIGNAL” and “CANCELLED”. AQS also stores the head Node and tail Node of the linked list. Therefore, in fact, AQS stores the data structure of blocked threads as a Node bidirectional linked list. The addWaiter() method encapsulates the blocking thread as a Node and stores it in the linked list of AQS.

Flow chart:

Source code analysis:

private Node addWaiter(Node mode) {
	// Encapsulate the thread without the lock into a node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
	// If the AQS tail is not null, the AQS list has been initialized to try to add the constructed node to the end of the list
        if(pred ! =null) {
            node.prev = pred;
	    //cas replaces the tail of AQS
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                returnnode; }}// There is no initial call to enq()
        enq(node);
        return node;
    }
Copy the code
private Node enq(final Node node) {
	    / / spin
        for (;;) {
            Node t = tail;
	    // If the end node is empty, the AQS list has not been initialized yet
            if (t == null) { // Must initialize
	    //cas initializes the head of AQS
            // Note that the head node does not store thread information, which means that the head node is a virtual node
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
		// If the end node is not empty, add it to the end of the list
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    returnt; }}}}Copy the code

Next analyze acquireQueued()

acquireQueued()

The acquireQueued() method blocks the threads stored in the AQS linked list using the locksupport.park () method.

Flow chart:

Source code analysis:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
	    // Enter spin
            for (;;) {
		// Get the previous node of the current node
                final Node p = node.predecessor();
		// If the previous node is head and a second attempt to acquire the lock succeeds, remove the node from the AQS queue and replace the head with an interrupt flag
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // Note that the for loop is broken only when the lock is preempted
                    return interrupted;
                }
                // Remove nodes CANCELLED and block threads threads are blocked here
                // Notice that the thread is woken up and continues to execute the for loop to try to preempt the lock
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if (failed)
                // Change the node status to CANCELLED if the node failscancelAcquire(node); }}Copy the code

In acquireQueued () method has two methods more important shouldParkAfterFailedAcquire () method and parkAndCheckInterrupt () method.

shouldParkAfterFailedAcquire()

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
	    // If the node is in SIGNAL state, no processing is required
            return true;
        if (ws > 0) {
            /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
	   // If the state of the node is >0, the node is canceled. The node in this state needs to be cleared. Use the do while loop to clear the previous continuous node in the canceled state
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
	    // In normal cases, cas is used to replace the state of the previous node with SIGNAL state -1
	    // Note that all the nodes in the queue are -1 except for the last one, including the head node
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
Copy the code

parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt(a) {
  Locksupport-park (thread) calls the unsafe.park () method to block the thread (a native method).
        LockSupport.park(this);
        return Thread.interrupted();
    }
Copy the code

So that’s the end of the lock() method and now let’s look at the unlock() method

Unlock () method

Flow chart:

The unlock() method of reentrantLock calls sync’s release() method.

public void unlock(a) {
	    // Each call to unlock decreases state by one
        sync.release(1);
}
Copy the code

The release() method has two important methods, the tryRelease() method and the unparkprecursor (). The tryRelease() method computs the value of state to see if the thread has succeeded in releasing the lock completely (this is because ReentrantLock is reentrant), Call the unparksucceeded () method to wake up the threads if the locks have been completely released, otherwise there is no need to wake up the threads.

public final boolean release(int arg) {
	// Only tryRelease returns true to indicate that the lock has been released and the blocking thread needs to be awakened otherwise no other thread needs to be awakened
        if (tryRelease(arg)) {
            Node h = head;
	// If the header is not empty and the state is not 0, the synchronization queue has been initialized and there is a node to wake up
	// Note that the head of the synchronization queue is a virtual node, which is clear from the node-building code
	/ / and in shouldParkAfterFailedAcquire approach to turn head node status change to 1
	// Return true if head has a state of 0, which means there are no elements in the queue that need to be awakened
            if(h ! =null&& h.waitStatus ! =0)
		// Wake up the next node of the header
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
Copy the code

tryRelease()

protected final boolean tryRelease(int releases) {
	     // Reduce reentrant times
            int c = getState() - releases;
	    Throw an exception if the thread that acquired the lock is not the current thread
            if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
            boolean free = false;
	    // If state is 0, the lock has been completely released
            if (c == 0) {
                free = true;
		// Set the thread that acquires the lock to null
                setExclusiveOwnerThread(null);
            }
	    // Reset the value of state
            setState(c);
	    // Return true if the lock is released and false otherwise
            return free;
        }
Copy the code

unparkSuccessor()

private void unparkSuccessor(Node node) {
        Set the head state to 0 to indicate that threads are being awakened. If the head state is 0, it will not enter this method
        int ws = node.waitStatus;
        if (ws < 0)
            // Set the state of the header to 0
            compareAndSetWaitStatus(node, ws, 0);

        /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
	// The next state of the cancelled header is not a cancelled node (since headers do not store blocked threads)
        Node s = node.next;
	// The current node is null or cancelled
        if (s == null || s.waitStatus > 0) {
            s = null;
	 // Start aQS at the end of the list to find the nearest non-empty state cancelled node and assign s to the node. Traversal from the tail does not result in prve being unassigned
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        if(s ! =null)
	    // Call locksupport.unpark () to wake up the specified thread
            LockSupport.unpark(s.thread);
    }	
Copy the code

Five:

In conclusion, the following points need to be paid attention to:

1.ReentrantLock has two implementations: fair lock and unfair lock. In fact, there are only two differences between the two implementations: the first is that at the beginning of the lock() method, the non-fair lock will try cas to preemption the lock and insert the queue. The unfair lock will preempt the lock once, while the fair lock will judge whether there are waiting threads in the AQS linked list, and the threads without waiting will preempt the lock.

2. The data structure of AQS storing blocked threads is a bidirectional linked list structure, and it follows first-in-first-out, because it wakes up from the next node of the beginning node, and new nodes are added to the tail of the linked list when added, so AQS is also a queue data structure.

3. The thread will continue to execute the acquireQueued() method because it is blocking in the for loop of the acquireQueued() method and will attempt to acquire the lock if it succeeds in removing the node from the AQS and breaking out of the for loop, otherwise it will continue to block. The lock failed because someone cut in line. Unfair lock).