ReentrantLock

ReentrantLock lock and unlock

ReentrantLock is a ReentrantLock provided by juc in the form of an API, typically used in this way

reentrantLock.lock();
doSomeThing();
reentrantLock.unlock();Copy the code

Lock means lock, unlock means unlock, so how do we do that

lock

public void lock() {    
    sync.lock();
}Copy the code

In ReentrantLock, there are two implementations of lock, one is FairSync and the other is NonfairSync

The following is the implementation of NonfairSync: First use CAS to change the value of state. If the change is successful, the lock can be obtained. Set the current thread to be the thread that acquired the lock. If the modification fails, it is obtained in exclusive mode.

final void lock(a) {
    if (compareAndSetState(0.1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}Copy the code

Acquire will enter the acquire method in aQS.

The code here will try to acquire the lock, and if it fails, add the current thread to the queue (addWaiter) and suspend the current thread (acquireQueued).

public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

Let’s take a look at the tryAcquire implementation.

TryAcquire will eventually enter nonfairTryAcquire’s method of NonfairSync. This method determines whether the current state is 0. If it is 0, it indicates that resources can be obtained. Then use CAS to change the value of state. If state is not 0, it is locked. If state is not 0, it is locked. If state is not 0, it determines whether the current thread is the thread that acquired the lock. Returns false if the lock was not acquired or was acquired by a thread other than the current one.

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true; }}else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}Copy the code

As mentioned above, Sync has two implementations in ReentrantLock, fair and unfair in terms of access to resources

protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                return true; }}else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        } 
       return false; }}Copy the code

The difference between nofairTryAcquire and tryAcquire is that if a new thread finds that the current resource is available, the unfair implementation will directly attempt to acquire it. The fair implementation will determine whether the current queue has a waiting thread and acquire it only if it does not.

AddWait adds the current thread to the queue.

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if(pred ! =null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node); 
   return node;
}Copy the code

Then there is the acquireQueued method.

This method retrieves the previous node of the current node in the loop. If the previous node is the head node, the current thread tries to acquire the resource again, and if it does, sets the current node as the head node. If the previous node is not the head node, or no resource has been acquired, the current thread is suspended.

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if(failed) cancelAcquire(node); }}Copy the code

ShouldParkAfterFailedAcquire judgment in front of the current node, the node status,

  • SIGNAL indicates that the current node can park, and the preceding node unpark the current node’s thread
  • If it is >0, it indicates that the previous nodes have been cancelled and are useless. Delete these nodes and the current node cannot park because there may be no threads for the current node unpark
  • In other cases, if the preceding node is set to SIGNAL, the current node cannot park because there may be no threads for the current node unpark

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}Copy the code

unlock

The first is the unlock method, which calls sync’s release method to release a resource

public void unlock() {
    sync.release(1);
}Copy the code

In the release method, tryRelease is used to determine if the current thread is the one acquiring the resource. If it is not, an error is reported. If it is, subtract the value of state from the value of Releases, which equals 0, and the current thread no longer holds the lock. The unparksucceeded node is then called.


public final boolean release(int arg) {    if (tryRelease(arg)) {
        Node h = head;
        if(h ! = null && h.waitStatus ! = 0) unparkSuccessor(h);return true;    }
    return false;
}Copy the code

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if(Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free =false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}Copy the code

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! = null && t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! = null) LockSupport.unpark(s.thread); }Copy the code

What is the AQS

AQS full name AbstractQueuedSynchronizer, juc in many tools are based on the implementation. As can be seen from the above, AQS provides the ability to obtain and release resources. If obtaining resources succeeds, the current thread will continue to execute; if obtaining resources fails, the current thread will be suspended (park).

AQS maintains a queue for threads that are suspended when they fail to acquire resources. Each Node in the queue is represented by AQS inner class Node.

Node

Node is a synchronous queue Node that encapsulates the state of the corresponding thread and Node in the queue.

Each node in the queue has a state:

  • CANCELLED, indicates that the current node is CANCELLED and the thread represented by the current node will no longer compete for the lock

  • SIGNAL, later said that the current node need to unpark, we saw in the previous shouldParkAfterFailedAcquire if front SIGNAL condition node is found, said he is sure to be unpark, so return true said can park, the current thread
  • CONDITION, which means that the current thread is waiting for a CONDITION
  • PROPAGATE, means that the next acquireShared should PROPAGATE unconditionally
  • 0: default state

Condition

Condition has await, singal and singalAll methods, corresponding to wait, notify and notifyAll of Object.

Condition also maintains a queue representing the threads waiting on the current Condition object.

The await method now appends a node to the queue and releases all the resources that the current thread has acquired. It also records the number of resources that the current thread has released for re-acquiring locks later. It then determines whether the current Node is in the synchronization queue, which is the Node maintained by Node, not Condition. If not in the sync queue, the current thread is suspended. When the thread recovers from the suspended state, the current node is already in the synchronization queue and attempts to acquire the resource. After the resource is acquired, the cancelled wait is cleared.

public final void await() throws InterruptedException {
    if(Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); // Append the node int savedState = fullyRelease(node); Int interruptMode = 0;while(! IsOnSyncQueue (node)) {// Check whether the queue is synchronized locksupport. park(this);if((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)break;
    }
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE)// Try to obtain the resource interruptMode = REINTERRUPT;if(node.nextWaiter ! = null) // clean upifcancelled unlinkCancelledWaiters(); // Clean up the cancellation waitingif(interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }Copy the code

The signal method gets the first wait and wakes up the first wait.

public final void signal() {
    if(! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter;if(first ! = null)doSignal(first);
}Copy the code

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while(! transferForSignal(first) && (first = firstWaiter) ! = null); }Copy the code

The transferForSignal method modifies the state of the node and places the node in the sync queue. The preceding await process will determine whether the node is in the sync queue. This is when the sync queue is placed.

final boolean transferForSignal(Node node) {
    if(! compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;
    Node p = enq(node);
    int ws = p.waitStatus;
    if(ws > 0 || ! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true; }Copy the code

conclusion

At the beginning, AQS did not combine with the use of the scene, not good to understand. It makes sense to start with ReentrantLock and see how AQS controls thread blocking and awakening. Then it introduces the realization of AQS node and Condition of AQS.

Last but not least, we often see the operation of park and unpark. What is the function? Park suspends the current thread. Unpark will wake up the specified thread, and if the specified thread has not been suspended, the suspension of the specified thread (park) will be returned without suspension.