preface

Thread concurrency series:

Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Unsafe/CAS/LockSupport Application and principle Java concurrency “lock” nature (step by step to implement the lock) Java Synchronized implementation of mutual exclusion application and source exploration Java object header analysis and use (Synchronized related) Java The evolution process of Synchronized partial lock/lightweight lock/heavyweight lock Java Synchronized principle of heavyweight lock in depth (mutual exclusion) Java Synchronized principle of heavyweight lock in depth (synchronization) Java concurrency AQS In-depth analysis (on) the Java concurrency of AQS deep parsing (under) Java Thread. Sleep/Thread. Join/Thread. The yield/Object. Wait/Condition. Await explanation of Java concurrency Already the thorough analysis of concurrent Java (with Synchronized difference) Java Semaphore/CountDownLatch/CyclicBarrier ReentrantReadWriteLock in-depth analysis Deep parsing (principle), Java Semaphore CountDownLatch/CyclicBarrier in-depth analytical (application), the most detailed graphic analytic Java various locks (ultimate) thread pool will understand series

The previous part analyzed the process of locking and releasing the heavyweight lock in the scenario of mutually exclusive threads. This part will analyze the waiting and notification mechanism of the heavyweight lock in the synchronous process. Through this article, you will learn:

Wait /notify/notifyAll (xx) wait/notify/notifyAll (xx 5, Wait /notify/notifyAll () flow chart 6, wait/notify/notifyAll synchronization lock flow chart 7, Wait /notify/notifyAll troubleshooting

1. Wait /notify/notifyAll locks

A Demo

Thread synchronization requires synchronization with wait/notify under certain conditions. Let’s start with a simple example:

public class TestThread { static Object object = new Object(); static int count = 1; Public static void main(String args[]) {public static void main(String args[]) {public static void main(String args[]) {public static void main(String args[]) {public static void main(String args[]) {public static void main(String args[]) { count--; If (count == 0) {//count == 0 to wait object.wait(); } } catch (Exception e) { } } }).start(); Public void run() {public void run() {public void run() {public void run(); object.notify(); } catch (Exception e) { } } }).start(); }}Copy the code

The functionality is simple: thread B produces something (increasing count), thread A consumes something (decreasing count), and thread A finds that nothing is available, calls wait, and then runs again.

The condition is the value of: count.

Normally, thread A waits for count and thread B notifies thread A that count is ready. This is synchronization between threads.

Why do I need to acquire locks before WAIT

Now from the perspective of multi-threaded concurrency, the possible running order of the Demo is as follows:

The initial value of count is 1. Thread A is ready to call object.wait() until count==0. 3. Thread B has changed count and called Object.notify(). 4. After calling Object.wait(), thread A will block forever because it missed Object.notify().

The reason for the above problem is that count is shared between threads and changes to it are concurrent, so locks are required to allow mutually exclusive access to count.

Why did notify need to acquire the lock before

You might say: if a lock is used to protect count, only the corresponding shared variable can be protected, and notify can be left unlocked. The following code:

// synchronized (object) {count--; If (count == 0) {//count == 0 to wait object.wait(); }} // synchronized (object) {// synchronized (object); } object.notify();Copy the code

If notify is not in the synchronization block, thread B changes the count value and releases the lock. Thread A is not in the synchronization queue yet, so thread A cannot wake up, and thread A will block. NotifyAll works the same way. The specific principle of Notify will be analyzed in detail next.

How does the JVM avoid abnormal calls

Wait /notify/notifyAll needs to be called in a synchronized block, which is not necessarily the case. Therefore, the JVM detects when wait/notify/notifyAll is called whether the current thread has acquired a lock, and raises an exception if no lock has been obtained.

#ObjectMonitor.cpp void ObjectMonitor::notify(TRAPS) { CHECK_OWNER(); . } void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { ... CHECK_OWNER(); . } void ObjectMonitor::notifyAll(TRAPS) { CHECK_OWNER(); . } #define CHECK_OWNER() \ do { \ if (THREAD ! If (THREAD->is_lock_owned((address) _owner)) {if (THREAD->is_lock_owned((address) _owner)) {if (THREAD->is_lock_owned((address) _owner)) { /* Convert from basiclock addr to Thread addr */ \ _recursions = 0; \ OwnerIsThread = 1 ; \} else {// No Throw exception \ TEVENT (Throw IMSX); \ THROW(vmSymbols::java_lang_IllegalMonitorStateException()); \ } \ } \ } while (false)Copy the code

As you can see, calling wait/notify/notifyAll invokes the macro CHECK_OWNER () to determine whether the current thread for the lock, no throwing an IllegalMonitorStateException anomalies.

Summary:

Wait /notify/notifyAll packets need to be in a synchronized block because the conditions to protect synchronization can be accessed correctly in concurrent scenarios.

Wait /notify/notifyAll source code entry

Wait /notify/notifyAll methods are declared in the Java top-level class Object. Java and are implemented in the native layer.

#Object.c static JNINativeMethod methods[] = { {"hashCode", "()I", (void *)&JVM_IHashCode}, {"wait", "(J)V", (void *)&JVM_MonitorWait}, {"notify", "()V", (void *)&JVM_MonitorNotify}, {"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll}, {"clone", "()Ljava/lang/Object;" , (void *)&JVM_Clone}, };Copy the code

You can see that it is a dynamically registered JNI function. Take wait as an example to continue looking for:

#jvm.cpp
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
  ...
  ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END
Copy the code

Here we are again in the familiar ObjectSynchronizer class.

3, Object.wait(xx) process parsing

Once you’ve found the entry to the underlying source code, the rest is relatively simple.

#synchronizer. CPP void ObjectSynchronizer:: Wait (Handle obj, jlong millis, TRAPS) {if (UseBiasedLocking) { Revoke BiasedLocking::revoke_and_rebias(obj, false, THREAD); assert(! obj->mark()->has_bias_pattern(), "biases should be revoked by now"); }... / / expansion to heavyweight ObjectMonitor * monitor = ObjectSynchronizer: lock: inflate (THREAD, obj ()); DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis); // Call ObjectMonitor's wait function monitor->wait(millis, true, THREAD); . }Copy the code

Synchronized expanded to a heavyweight lock after the wait function was called. At this point, the call is already flowing into ObjectMonitor.

#ObjectMonitor.cpp void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { ... if (interruptible && Thread::is_interrupted(Self, true) && ! HAS_PENDING_EXCEPTION) { ... THROW(vmSymbols::java_lang_InterruptedException()); return ; } // Construct a node that encapsulates the current thread ObjectWaiter node(Self); // The node status is TS_WAIT node.TState = ObjectWaiter::TS_WAIT; Self->_ParkEvent->reset() ; OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag Thread::SpinAcquire (&_waitsetLock, "waitset-add "); ------->(1) AddWaiter (&node); Thread::SpinRelease (&_waitsetLock); . // release lock + wake up thread ------->(2) exit (true, Self); // exit the monitor ... { // State transition wrappers OSThread* osthread = Self->osthread(); OSThreadWaitState osts(osthread, true); {... if (interruptible && (Thread::is_interrupted(THREAD, False) | | HAS_PENDING_EXCEPTION)) {/ / Intentionally empty} else if (node. _notified = = 0) {/ / hang myself -- -- -- -- -- -- -- > (3) the if (millis <= 0) { Self->_ParkEvent->park () ; } else { ret = Self->_ParkEvent->park (millis) ; }}... } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm ObjectWaiter::TStates v = node.TState ; If (v == ObjectWaiter::TS_RUN) {// Enter (Self); } else {/ / v state is in the synchronous queue at this time -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- > (4) ReenterI (Self, & node); node.wait_reenter_end(this); }... } // OSThreadWaitState() }Copy the code

(1) In the process of locking and releasing locks, we can see that synchronization queues (_CXq and _EntryList) are introduced. When the lock contention fails, it is eventually added to the synchronization queue, and when the thread releases the lock, it takes out the node from the synchronization queue to wake up. During thread synchronization, when the wait function is called, the node is added to the wait queue _WaitSet. Let’s see how the source code is implemented:

#ObjectMonitor. CPP inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {if (_WaitSet == NULL) { The current node is treated as the head node _WaitSet = node; node->_prev = node; node->_next = node; ObjectWaiter* head = _WaitSet; ObjectWaiter* tail = head->_prev; assert(tail->_next == head, "invariant check"); tail->_next = node; head->_prev = node; node->_next = head; node->_prev = tail; }}Copy the code

_pre refers to the front drive node and _next refers to the rear drive node. _WaitSet is a two-way circular linked list.

As shown in the figure above, thread A invokes wait(xx) first, followed by thread B, and finally thread C. Thread A is at the head of the queue, thread C is at the end of the queue. (2) Exit (XX) has been analyzed in the previous article and its main functions are as follows:

Release the lock and wake up the node in the synchronization queue.

(3) Still call ParkEvent to suspend yourself. (4) Call ReenterI(xx) to compete for the lock when the thread is awakened (in the synchronization queue).

void ATTR ObjectMonitor::ReenterI (Thread * Self, ObjectWaiter * SelfNode) { ... for (;;) If (TryLock (Self) > 0) break; If (TrySpin (Self) > 0) break; {// lock failed, suspend jt->set_suspend_equivalent(); if (SyncFlags & 1) { Self->_ParkEvent->park ((jlong)1000) ; } else { Self->_ParkEvent->park () ; If (TryLock(Self) > 0) break; . } // UnlinkAfterAcquire (Self, SelfNode) is removed from the synchronization queue. }Copy the code

If the lock preemption fails, it will not be added to the synchronization queue because it is already in the synchronization queue.

In summary, calling object.wait (xx) basically does four things:

1. Encapsulate the node and add it to the wait queue. Release the lock and wake up the thread in the synchronization queue. 3. Hang yourself. 4. Continue competing for locks after being awakened.

4, Object.notify()/ object.notifyall () process parsing

Since a thread is suspended after calling wait(xx), when is it removed from the wait queue and awakened? Let’s look at object.notify ().

Notify parsing

Similar to the object.wait (xx) method entry:

# synchronizer. CPP void ObjectSynchronizer: : notify (Handle obj, TRAPS) {if (UseBiasedLocking) {/ / biased locking, Undo BiasedLocking::revoke_and_rebias(obj, false, THREAD); } markOop mark = obj->mark(); If (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {// If (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { return; } / / expansion for heavy lock, and call the ObjectMonitor. Notify () function ObjectSynchronizer: : inflate (THREAD, obj () - > notify (THREAD); }Copy the code

Objectmonitor.notify () is finally called:

void ObjectMonitor::notify(TRAPS) { ... If (_WaitSet == NULL) {return if (_WaitSet == NULL) { } //notify Policy, default is 2 int Policy = Knob_MoveNotifyee; Thread::SpinAcquire (&_waitsetLock, "waitset-notify "); -------->(1) ObjectWaiter * iterator = DequeueWaiter(); if (iterator ! = NULL) { ... ObjectWaiter * List = _EntryList ; if (List ! = NULL) {// Change the node status to TS_ENTER... } if (Policy == 0) {// prepend to EntryList // Insert _EntryList header... } else if (Policy == 1) {// Append to EntryList... } else if (Policy = = 2) {/ / the prepend the to CXQ / / the default Policy -- -- -- -- -- -- -- -- -- -- -- -- -- - > (2) if (List = = NULL) {/ / _EntryList is empty, Iterator ->_next = iterator->_prev = NULL; _EntryList = iterator ; } else {// Change the state to TS_CXQ iterator->TState = ObjectWaiter::TS_CXQ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; If (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {break; }}}} else if (Policy == 3) {// append to CXQ... } else {ParkEvent * ev = iterator->_event; iterator->TState = ObjectWaiter::TS_RUN ; OrderAccess::fence() ; ev->unpark() ; }... } Thread::SpinRelease (&_waitsetLock); . }Copy the code

Two priorities are still listed: (1) Remove the node from the wait queue:

# ObjectMonitor. CPP inline ObjectWaiter * ObjectMonitor: : DequeueWaiter () {/ / take team head node ObjectWaiter * waite = _WaitSet; if (waiter) { DequeueSpecificWaiter(waiter); } return waiter; } inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) { ... ObjectWaiter* next = node->_next; If (next == node) {if (next == node) {if (next == node) {if (next == node) {if (next == node) {if (next == node) {if (next == node) { ObjectWaiter* prev = node->_prev; next->_prev = prev; prev->_next = next; If (_WaitSet == node) {// Move _WaitSet back to the next node. } } node->_next = NULL; node->_prev = NULL; }Copy the code

(2) How to move the queue head node from the wait queue to the synchronization queue (_CXq, _EntryList) Different policies have different operations. The default policy is used as an example:

1. If _EntryList is empty, add the node to the head of the _EntryList queue. 2. Otherwise, add the node to the head of _CXQ queue.

If thread A, thread B, and thread C call wait(xx) and block, the wait queue is A- >B- >C (from beginning to end). Now thread D calls notify() and the wait queue is as follows:

In summary, calling object.notify () basically does one thing:

Remove the node from the wait queue and add it to the synchronization queue.

At the same time through the source code also explained two problems:

1. Notify does not release the lock. 2. The notify operation does not wake up the thread.

NotifyAll parsing

As the name implies, it notifies all waiting nodes. NotifyAll Moves a node from the wait queue to a synchronization queue. NotifyAll moves a node from the wait queue to a synchronization queue. NotifyAll moves a node from the wait queue to a synchronization queue.

#ObjectMonitor.cpp if (Policy == 2) { // prepend to cxq // prepend to cxq iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; }}}Copy the code

Unlike notify, the _CXQ header is inserted directly, which means that the node that was at the end of the wait queue is at the front of the synchronization queue.

Wait /notify/notifyAll

At this point, the principle of the three has been analyzed, and the relationship of the three is connected in series with the graph:

6. Flow chart of thread lock under mutex synchronization

Combined with the previous, this article, has been the implementation of mutex and synchronous lock process analysis, the next will be combined with the two, I hope we can control the whole process from the global. Let’s start with some pseudocode:

Synchronized (object) {object.wait(); Synchronized (object) {object.notify(); //doSomething }Copy the code

Before, you might have wondered:

1. If thread A successfully acquires the lock, thread B fails to acquire the lock again. What should thread B do? Wait (); wait(); wait(); 3. What does thread A do when it exits the critical section and releases the lock? 4. What happens when thread B calls notify()?

.

1. If thread A successfully acquires the lock, thread B fails to acquire the lock again. What should thread B do? A:

Thread B fails to acquire the lock and inserts itself into the synchronization queue, which consists of two queues: _CXQ and _EntryList. Insert to the head of the _CXq queue. Thread B suspends itself.

Wait (); wait(); wait(); A:

Thread A adds itself to the wait queue (_WaitSet) by inserting itself into the head of the wait queue. Release occupied locks. Thread A suspends itself.

3. What does thread A do when it exits the critical section and releases the lock? A:

Thread A releases the lock after exiting the critical section. The thread waiting for the lock is then woken up from the synchronization queue. The default mode is as follows: If the _EntryList queue is not empty, fetch the head node of the _EntryList queue and wake up. If _EntryList is empty, point _EntryList to _CXQ and wake up by fetching the queue head node.

4. What happens when thread B calls notify()? A:

Thread B takes the head node from the wait queue and inserts it into the synchronization queue. The processing method varies according to different policies. The default method is as follows: If _EntryList is empty, the node is added to the head of the _EntryList queue. Otherwise, add the node to the head of the _CXQ queue.

Finally, it is shown as follows:

To summarize the following, the core of understanding heavyweight locks:

1, Lock the _OWNER field in ObjectMonitor. 2. Work on the synchronization queue (_CXQ /_EntryList) and wait queue (_WaitSet).

Wait /notify/notifyAll

There are many articles on the web that explain the knowledge of heavyweight locks, some of which may be far-fetched. After analyzing the source code for this article, you will be able to identify some common puzzles: q: Is the notify thread random? A:

Notify does not wake up the thread in normal flow, but simply moves the node in the waiting queue to the synchronization queue according to certain policies. The move strategy is to select the first thread in the wait queue and move it to the synchronization queue. The synchronous queue is FIFO compliant and notify calls are not random. When a thread releases the lock, it will wake up the thread in the synchronization queue according to the pattern (see 6th analysis for specific patterns/policies). In other words, the thread may not be the first one in the synchronization queue, but it may not be the next thread to acquire the lock. The official note says notify is a random wake up call

Q: Notify /notifyAll

public class TestThread { static Object object = new Object(); static Thread a, b, c; public static void main(String args[]) { a = new Thread(new Runnable() { @Override public void run() { try { synchronized (object) { System.err.println("A before wait " + System.nanoTime()); b.start(); Thread.sleep(1000); object.wait(); System.err.println("A after wait " + System.nanoTime()); } } catch (Exception e) { } } }); a.start(); b = new Thread(new Runnable() { @Override public void run() { try { synchronized (object) { System.err.println("B before  wait " + System.nanoTime()); c.start(); Thread.sleep(1000); object.wait(); System.err.println("B after wait " + System.nanoTime()); } } catch (Exception e) { } } }); c = new Thread(new Runnable() { @Override public void run() { try { synchronized (object) { System.err.println("C before  wait " + System.nanoTime()); object.notify(); object.notify(); System.err.println("C after wait " + + System.nanoTime()); } } catch (Exception e) { } } }); }}Copy the code

As above, there are threads A, B, and C, where A starts B and B starts C. Print the following:






Q: What is false awakening? A:

public class TestThread { static Object object = new Object(); static Thread a, b, c; static int count = 0; public static void main(String args[]) { a = new Thread(new Runnable() { @Override public void run() { try { synchronized (object) { System.err.println("A before wait " + System.nanoTime()); if (count == 0) object.wait(); count--; System.err.println("A count:" + count + " " + System.nanoTime()); } } catch (Exception e) { } } }); a.start(); b = new Thread(new Runnable() { @Override public void run() { try { synchronized (object) { System.err.println("B before  wait " + System.nanoTime()); if (count == 0) object.wait(); count--; System.err.println("B count:" + count + " " + System.nanoTime()); } } catch (Exception e) { } } }); b.start(); Thread.sleep(1000); // Make sure threads A and B are running thread.sleep (1000); } catch (Exception e) { } c = new Thread(new Runnable() { @Override public void run() { try { synchronized (object) { System.err.println("C before wait " + System.nanoTime()); count++; object.notifyAll(); System.err.println("C after wait " + +System.nanoTime()); } } catch (Exception e) { } } }); c.start(); }}Copy the code

As shown, there are threads A, B, and C. If count == 0, wait() is called to suspend the thread, C changes (production)count, and informs all waiting threads. A and B wake up and change (consumption)count.





        System.err.println("A before wait " + System.nanoTime());
        while (count == 0)
            object.wait();
        count--;
        System.err.println("A count:" + count + " " + System.nanoTime());
Copy the code

When the thread is woken up, it continues to look at the condition variable and suspends again if it is not met.

For example, if only one thread (A) calls wait and the other thread (B) calls notify, there is no need to add while to A. If you are not sure whether to add or not, or do not want to distinguish between scenes, then it is best to add, after all, it is only one more judgment, more safe.

At this point, the analysis of the knowledge related to Synchronized has been completed. The next step will focus on the analysis of AQS and horizontal comparison with Synchronized.

This article source code is based on JDK1.8, running environment is JDK1.8. Therefore, the above demo may have different effects in your environment, please be careful. Different versions may have different strategies, but the core idea is the same.

If you like, please like, pay attention to your encouragement is my motivation to move forward

Continue to update, with me step by step system, in-depth study of Android/Java