preface

Thread concurrency series:

Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Unsafe/CAS/LockSupport Application and principle Java concurrency “lock” nature (step by step to implement the lock) Java Synchronized implementation of mutual exclusion application and source exploration Java object header analysis and use (Synchronized related) Java The evolution process of Synchronized partial lock/lightweight lock/heavyweight lock Java Synchronized principle of heavyweight lock in depth (mutual exclusion) Java Synchronized principle of heavyweight lock in depth (synchronization) Java concurrency AQS In-depth analysis (on) the Java concurrency of AQS deep parsing (under) Java Thread. Sleep/Thread. Join/Thread. The yield/Object. Wait/Condition. Await explanation of Java concurrency Already the thorough analysis of concurrent Java (with Synchronized difference) Java Semaphore/CountDownLatch/CyclicBarrier ReentrantReadWriteLock in-depth analysis Deep parsing (principle), Java Semaphore CountDownLatch/CyclicBarrier in-depth analytical (application), the most detailed graphic analytic Java various locks (ultimate) thread pool will understand series

The previous chapter analyzed the details of AQS implementation of shared/exclusive lock and some difficult points, this chapter continues to analyze the rest of AQS knowledge. Through this article, you will learn:

1, the exclusive lock can/can’t interrupt 2, can/can’t interrupt sharing locked 3, she can/can’t wait 4, wait/inform the implementation of the 5, the differences between synchronous queue and waiting queue points 6, Condition. Await/Condition. The signal With the Object. Wait/Object. Notify the difference

1. Exclusive locks that can/cannot be broken

Definition of interruptible locks

Here’s a quick analogy:

  • Say a kind of scene first: Xiao Ming is net addiction teenager, play game in net bar all day, mother calls him to go home to have a meal (send interrupt command to him), Xiao Ming oral promise mother, but a hang up the phone immediately and play game. Mother repeatedly call, can not interrupt xiaoming’s Internet access process, we say that xiaoming’s Internet access process is not interrupted.
  • Say another kind of scene again: although Xiaoming is net addiction teenager, but listen to mother’s words very much, mother calls him to go home to have a meal (send interrupt command to him), xiaoming verbally agreed and shut down, that xiaoming’s Internet access process can be interrupted.

To understand this from a code perspective, let’s take a look at some lock/unlock pseudocode:

    private void testLock() {
        myLock.lock();
        doSometing1();
        doSometing2();
        myLock.unlock;
    }

    private void doSometing1() {//... }
    private void doSometing2() {//... }
Copy the code

DoSometing1 (); doSometing2(); doSometing1(); Thread B called mylock. lock() because it could not get the lock and blocked, while another thread C was waiting for data from thread B. Thread B could not get the lock and therefore could not produce the data needed by thread C, so C wanted to interrupt THREAD B. When thread B woke up, it had two options:

1, do not change the original intention, continue to try to obtain the lock, access to continue to block. It doesn’t matter how much C interrupts. 2. Check whether an interrupt occurs, and throw an exception if it does.

If myLock’s design satisfies option 1, the lock is said to be uninterruptible; if option 2 is met, the lock is said to be interruptible. It is shown as follows:

An uninterruptible exclusive lock

The invocation flow of an exclusive lock as previously analyzed:

acquire(xx)–>acquireQueued(xx)

In acquireQueued (xx) :

Places 1 and 2, as indicated above, simply detect the interrupt and return the interrupt value to the caller at the next level. In acquire(xx) :

When an interrupt is found, it is just a matter of re-plugging the interrupt flag.

It can be seen that the thread called acquire(xx) to acquire the lock. If an interrupt occurs, the thread still tries to acquire the lock by itself, and the interrupt call from the outside world cannot terminate its process of acquiring the lock at all.

The lock is an uninterruptible exclusive lock.

Interruptible exclusive lock

To interrupt the process of acquiring the lock, you need to detect the interrupt flag bit and throw an exception or exit the process. See how AQS is implemented:

#AbstractQueuedSynchronizer.java public final void acquireInterruptibly(int arg) throws InterruptedException { If (thread.interrupted ()) throw new InterruptedException(); if (thread.interrupted ()) throw new InterruptedException(); if (! tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; // There is no need to return the interrupt state since the interrupt is detected below and an exception is thrown; } the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) / / after wake up, found that was interrupted, Throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code

As you can see, there are two places where exceptions are thrown:

1. Detect whether an interrupt has occurred before attempting to acquire the lock, and throw an exception if so. 2. After being woken up, detect whether the interruption has occurred. If so, throw an exception to interrupt the process of obtaining the lock.

2. Share lock can/cannot be broken

An unbreakable shared lock

The previous analysis of the shared lock call flow:

acquireShared(xx)–>doAcquireShared(xx)

In doAcquireShared (xx) :

DoAcquireShared (xx) is not the same as an uninterruptible exclusive lock: When doAcquireShared(XX) detects an interrupt, it directly completes the interrupt marker. Of course, like uninterruptible exclusive locks, they do not handle interrupts.

In this case, the lock is an uninterruptible shared lock.

Breakable shared lock

#AbstractQueuedSynchronizer.java public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // Throw new InterruptedException() if interrupted; if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; // There is no need to interrupt return because the interrupt is detected below and an exception is thrown. }} the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) / / after wake up, found that was interrupted, Throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code

As you can see, the logic is consistent with interruptible exclusive locks handling interrupts.

3, can/can not limit time lock

Exclusive lock for limited wait

Although locks can respond to interrupts, there are other scenarios that are not covered:

The thread does not want to wait forever to acquire the lock, but rather wants to wait for a certain amount of time, and if it does not acquire the lock, it gives up acquiring it.

Take a look at AQS:

#AbstractQueuedSynchronizer.java public final boolean tryAcquireNanos(int arg, Long nanosTimeout) throws InterruptedException { If (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } private boolean doAcquireNanos(int arg, Long nanosTimeout) throws InterruptedException {// Request time >0 if (nanosTimeout <= 0L) return false; // Final long deadline = system.nanotime () + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // check whether time is exhausted nanosTimeout = deadline-system.nanotime (); If (nanosTimeout <= 0L) // if (nanosTimeout <= 0L) if (shouldParkAfterFailedAcquire(p, Node) && nanosTimeout > spinForTimeoutThreshold) // Wake up after a certain amount of sleep locksupport. parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code

As you can see, once a thread is suspended, there are two ways to wake it up:

1, external call interrupt method. 2, the suspension limit time arrives.

When the thread is woken up, check the interrupt flag bit. If the interrupt occurs, it will throw an exception directly; otherwise, it will try to obtain the lock again. If the lock fails, it will judge whether it has timed out; if so, it will exit.

A shared lock that can wait for time

Similar to an exclusive lock with a time-limited wait, I won’t go into details.

4. Wait/notification implementation

Basic data structure

In a subclass AbstractQueuedSynchronizer: ConditionObject is the base of implement wait/notice. Let’s take a look at the structure:

ConditionObject private transient Node firstWaiter; Private TRANSIENT Node lastWaiter;Copy the code

FirstWaiter and lastWaiter, together with the nextWaiter pointer in a Node, maintain the wait queue:

With the data structure in place, we need methods to manipulate the data structure:

You can see that there are two forms of the method: awaitXX(xx)/signalXX() :

Await () : wait indefinitely. Await (long time, TimeUnit unit): await in a limited time, time units can be specified. AwaitNanos (Long nanosTimeout) : timed wait in nanoseconds. AwaitUninterruptibly (): An uninterruptible time-limited wait. AwaitUntil (Date deadline): specifies the timeout period as a point in the future. Signal (): notifies the node in the waiting queue. SignalAll (): notifies all nodes in the wait queue.

Thread A calls awaitXX(xx) to block until the condition is met, and thread B calls signalXX() to notify thread A that the condition is met. Obviously, this is a thread synchronization process. Analyze the wait/notification mechanism by taking await()/signal()/signalAll() methods.

ConditionObject. Await () implementation

Public final void await() throws InterruptedException { Throws an exception if (Thread.interrupted()) throw new InterruptedException(); // join queue Node Node = addConditionWaiter(); / / -- -- -- -- -- -- -- -- - > (1) all releases the lock / / int savedState = fullyRelease (node); //--------->(2) int interruptMode = 0; // If not in sync queue while (! IsOnSyncQueue (node)) {/ / -- -- -- -- -- -- -- -- - > (3) / / hung thread LockSupport. Park (this); When the thread is woken up, check to see if there is an interruption --------->(4) // The thread is woken up for two reasons: 1 is the interrupt, 2 it is other thread calls the signal if ((interruptMode = checkInterruptWhileWaiting (node))! = 0) break; If (acquireQueued(Node, savedState) && interruptMode! InterruptMode = REINTERRUPT; if (node.nextWaiter ! = null) // Clean up if cancelled // Run their work properly --------->(5) unlinkCancelledWaiters(); // Determine how to handle interrupts if (interruptMode! = 0) reportInterruptAfterWait(interruptMode); / / -- -- -- -- -- -- -- -- - > (6)}Copy the code

The whole await() method does three things:

1. Encapsulate the thread to the Node and queue it. 2. The conditions for the pending thread are met. 3. Threads scramble for locks after waking up.

The comment marks the six key methods, which are as follows:

Private Node addwaiter () {Node t = lastWaiter; if (t ! = null && t.waitStatus ! // Run the wait queue to remove the canceled Node unlinkCancelledWaiters() if the wait state is not CONDITION; t = lastWaiter; } Node = new Node(thread.currentThread (), node.condition); If (t == null) // If the queue is empty, the head pointer points to the current node firstWaiter = node; Else // Attach the current node to the last node. // end pointer to end node lastWaiter = node; return node; }Copy the code

This method encapsulates a thread as a node and adds it to a queue. (2) If you have to wait for the lock to be released, other threads can acquire the lock to do the corresponding operation.

final int fullyRelease(Node node) { boolean failed = true; Try {// get the current synchronization state int savedState = getState(); If (release(savedState)) {release failed = false; return savedState; } else {/ / release failure, explain the current lock is not an exclusive lock, throw new exception IllegalMonitorStateException (); } } finally { if (failed) node.waitStatus = Node.CANCELLED; }}Copy the code

We can see from this method that AQS must be exclusive when we call await(). (3)

Final Boolean isOnSyncQueue(Node Node) {final Boolean isOnSyncQueue(Node Node) {final Boolean isOnSyncQueue(Node Node) { Node is not in sync in the queue if (node) waitStatus = = node) CONDITION | | node. The prev = = null) return false. If (node.next!) if (node.next! = null) return true; Return findNodeFromTail(node); return findNodeFromTail(node); return findNodeFromTail(node); }Copy the code

Ensure that the operation is not in the synchronization queue. (4)

Private int checkInterruptWhileWaiting (Node to Node) {/ / if the interrupt, further call transferAfterCancelledWait judgment, . Otherwise returns 0 directly return Thread interrupted ()? (transferAfterCancelledWait (node)? THROW_IE: REINTERRUPT) : 0; } final Boolean transferAfterCancelledWait (Node to Node) {/ / to be in this, that a thread has been interrupted, If (compareAndSetWaitStatus(node, node. CONDITION, 0)) { //CAS succeeds. Signal () enq(node) has not occurred. return true; } // Indicates that CAS fails because the node state in signal() has been changed. // Therefore, you just need to wait for signal() to join the node to the synchronization queue. while (! isOnSyncQueue(node)) Thread.yield(); return false; }Copy the code

TransferAfterCancelledWait (xx) returns true, said need to throw the interrupt is unusual, returns false, said just need to interrupt flag bit to catch up on. interruptMode! =0, it indicates that an interrupt has occurred and the loop exits directly.

(5) The node may not have been removed from the wait queue (if signal was not called), even though it was added to the synchronization queue.

(6)

Private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { Signal if (interruptMode == THROW_IE) throw new InterruptedException(); Else if (interruptMode == REINTERRUPT) // Add the interrupt flag bit to selfInterrupt(); }Copy the code

As you can see, even if an interrupt occurs, await(xx) will not process the interrupt until the lock is acquired, which also guarantees that the thread must have acquired the lock when it returns from the await(xx) call.

ConditionObject. Signal () implementation

public final void signal() { if (! IsHeldExclusively ()) / / is not an exclusive lock, then throw new exception IllegalMonitorStateException (); // first = firstWaiter; if (first ! = null) doSignal(first); } private void doSignal(Node first) {do {if (firstWaiter = first.nextwaiter) == null) LastWaiter = null; First. NextWaiter = null; } while (! TransferForSignal (first) &&// Move the node to the sync queue // head pointer to the next node (first = firstWaiter)! = null); } final Boolean transferForSignal(Node Node) {// Change the status if (! compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // Join the queue Node p = enq(Node); int ws = p.waitStatus; / / p is the precursor node node, be cancelled or modified state if the precursor node failure, directly awakened the current node associated thread if (ws > 0 | |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }Copy the code

As you can see, calling signal() requires that AQS is an exclusive lock and that the current thread has acquired the exclusive lock. The signal() method does three things:

1. Remove the node from the wait queue. 2. Add the node to the synchronization queue. 3. If there is only one node in the synchronization queue or the status of the precursor node fails to be modified, the current node is woken up.

ConditionObject. SignalAll () implementation

public final void signalAll() { if (! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first ! = null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; // Remove first.nextWaiter = null; TransferForSignal (first); first = next; } while (first ! = null); }Copy the code

As you can see, signal() simply moves the head of the wait queue to the synchronization queue, while signalAll() moves all the nodes in the wait queue to the synchronization queue.

5. Similarities and differences between synchronous queues and wait queues

1, synchronous queue is a bidirectional list implementation (FIFO), through node. prev/Node.next pointer links the precursor and successor nodes. 2. Wait queue is a one-way linked list implementation (FIFO), through the Node. NextWaiter pointer linked to the successor nodes. 3. The nodes in both queues are of Node type. 4. Failure to obtain the lock will be added to the end of the synchronization queue, and successful acquisition will be removed from the synchronization queue. 5. Calling await() will be added to the tail of the wait queue, and calling signal() will be removed from the head of the wait queue and added to the tail of the synchronous queue.

6, Condition. Await/Condition. Signal and Object. The wait/Object. Notify the difference

Let’s start with some code: Object.java’s own wait/notification application:

Object object = new Object(); private void testObjectWait() { synchronized (object) { try { object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void testObjectNotify() { synchronized (object) { object.notify(); }}Copy the code

Look at the Condition wait/notification application:

Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); Condition condition2 = lock.newCondition(); private void testConditionAwait() { lock.lock(); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } private void testConditionSignal() { lock.lock(); try { condition.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); }}Copy the code

Similarities and differences: similarities

1. The corresponding method can be called only after obtaining the lock, which is to ensure the correctness of the condition variable under the condition of thread concurrency. 2. Wait/notify methods come in pairs. 3. Wait methods can respond to interrupts. 4. Wait methods all support timeout return.

The difference between

Condition wait/notification relies on AQS, which is used in conjunction with Lock and implemented in the JDK.

Object wait/notification relies on the synchronized keyword and is implemented in the JVM.

In addition, Condition’s wait/notification mechanism is more flexible than Object’s, as follows:

Condition The wait may or may not respond to an interrupt. Condition wait can set timeout to a future point in time. Multiple conditions can be generated from the same lock.

The next article will focus on the implementation and application of AQS derived subclass wrappers such as ReentrantLock, ReentrantReadWriteLock, Semaphore, and CountDownLatch.

This article is based on JDK1.8.

If you like, please like, pay attention to your encouragement is my motivation to move forward

Continue to update, with me step by step system, in-depth study of Android/Java