Have you ever wondered what happens to the threads that apply for locks? Some may acquire a lock and execute the business code immediately. But if a lock is required by many threads, how are those threads handled?
Today we go into synchronized heavyweight locks and see what happens to threads that don’t acquire locks.
Ps: If you don’t want to see the results of the analysis, you can drag it to the end, there is a summary graph at the end, a picture is worth a thousand words
Previous articles have looked at lock optimizations in Synchroinzed, but if there is a lot of competition, they all end up being heavyweight locks. So let’s start by looking directly at the heavyweight lock code.
To apply for the lock
In the ObjectMonitor:: Enter function, there is a lot of logic to judge and optimize execution, but the core is actually entering the queue via the EnterI function, which blocks the current thread
void ObjectMonitor::EnterI(TRAPS) {
Thread * const Self = THREAD;
// CAS attempts to set the current thread to the thread holding the lock
if (TryLock (Self) > 0) {
assert(_succ ! = Self,"invariant");
assert(_owner == Self, "invariant");
assert(_Responsible ! = Self,"invariant");
return;
}
// Call tryLock with spin to try again, and the operating system expects some subtle effects
if (TrySpin(Self) > 0) {
assert(_owner == Self, "invariant");
assert(_succ ! = Self,"invariant");
assert(_Responsible ! = Self,"invariant");
return; }...// Build the current thread into ObjectWaiter
ObjectWaiter node(Self);
Self->_ParkEvent->reset(a); node._prev = (ObjectWaiter *)0xBAD;
node.TState = ObjectWaiter::TS_CXQ;
ObjectWaiter * nxt;
for (;;) {
// Insert the ObjectWaiter object into the CXQ queue header using CAS
node._next = nxt = _cxq;
if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;
// CAS failed due to CXQ change. TryLock again
if (TryLock (Self) > 0) {
assert(_succ ! = Self,"invariant");
assert(_owner == Self, "invariant");
assert(_Responsible ! = Self,"invariant");
return; }}// Block the current thread
for (;;) {
if (TryLock(Self) > 0) break;
assert(_owner ! = Self,"invariant");
// park self
if (_Responsible == Self) {
Self->_ParkEvent->park((jlong) recheckInterval);
recheckInterval *= 8;
if(recheckInterval > MAX_RECHECK_INTERVAL) { recheckInterval = MAX_RECHECK_INTERVAL; }}else {
Self->_ParkEvent->park();
}
...
if (TryLock(Self) > 0) break;
++nWakeups;
if (TrySpin(Self) > 0) break; . }...Self has acquired the lock and needs to be removed from CXQ or EntryList
UnlinkAfterAcquire(Self, &node); . }Copy the code
- Before enqueueing, tryLock is called to try to set the _OWNER (thread pointer held by the current ObjectMonitor object lock) field to Self(pointing to the currently executing thread) via the CAS operation. If this is set successfully, the current thread has acquired the lock, otherwise it has not.
int ObjectMonitor::TryLock(Thread * Self) {
void * own = _owner;
if(own ! =NULL) return 0;
if (Atomic::replace_if_null(Self, &_owner)) {
return 1;
}
return - 1;
}
Copy the code
-
If tryLock does not succeed, tryLock will be called again (tryLock is called in trySpin) to try to get the lock, because this will tell the operating system that I desperately need the resource and want it allocated to me as much as possible. But this affinity is not a guaranteed agreement, just a positive one.
-
The current thread is wrapped in an ObjectWaiter object into the head of the CXQ queue
-
Block the current thread (via pthread_cond_wait)
-
When the thread is awakened and has acquired the lock, the UnlinkAfterAcquire method is called to remove the ObjectWaiter from either CXQ or EntryList
Core data structure
The ObjectMonitor object holds a queue of sychronized blocked threads and implements different queue scheduling strategies, so we must first recognize some important properties of this object
class ObjectMonitor {
// mark word
volatile markOop _header;
// pointer to owning thread or BasicLock
void * volatile _owner;
// The thread ID of the previous owner of monitor
volatile jlong _previous_owner_tid;
// The reentrant count is 0 for the first time
volatile intptr_t _recursions;
// The next thread to wake up
Thread * volatile _succ;
// A list of threads blocked on entry or re-entry, consisting of ObjectWaiter, which acts as a wrapper object for the thread
ObjectWaiter * volatile _EntryList;
// The CXQ queue stores threads that cannot enter because the lock has been blocked by another thread
ObjectWaiter * volatile _cxq;
// Threads in wait state (wait()) are added to waitSet
ObjectWaiter * volatile _WaitSet;
// Omit other attributes and methods
}
class ObjectWaiter : public StackObj {
public:
enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ };
// The last node
ObjectWaiter * volatile _next;
// The previous node
ObjectWaiter * volatile _prev;
/ / thread
Thread* _thread;
// Thread status
volatile TStates TState;
public:
ObjectWaiter(Thread* thread);
};
Copy the code
If you look at _next and _prev in ObjectWaiter, you can see that this is a two-way queue implementation, but actually our enqueue operation above does not form a two-way list, it forms a two-way list during the exit lock.
wait
Java Object class provides a communication method between wait and notify threads based on native implementation. In JDK, wait/notify/notifyAll is implemented through Native implementation. Of course, in JVM, Its implementation is still in the SRC/hotspot/share/runtime/objectMonitor CPP.
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * constSelf = THREAD; JavaThread *jt = (JavaThread *)THREAD; .If the thread is interrupted, an exception needs to be thrown
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
THROW(vmSymbols::java_lang_InterruptedException());
return;
}
jt->set_current_waiting_monitor(this);
// Construct the ObjectWaiter node
ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT; .// Add ObjectWaiter to the end of WaitSet
AddWaiter(&node);
/ / release the lock
exit(true, Self); .// Investigate park() and block the current thread
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty
} else if (node._notified == 0) {
if (millis <= 0) {
Self->_ParkEvent->park(a); }else {
ret = Self->_ParkEvent->park(millis); }}... }// Insert node at the end of the two-way list _WaitSet
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet;
ObjectWaiter* tail = head->_prev;
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
Copy the code
Above I have listed the main method logic of wait, which mainly performs the following steps
- Determine whether the current thread is interrupted and throw InterruptedException if it is interrupted
- If it is not interrupted, the ObjectWaiter node is constructed using the current thread and inserted at the end of WaitSet
- Call exit to relinquish the lock (the logic of relinquishing the lock is discussed below)
- The call to park(actually pthread_cond_wait) blocks the current thread
notify
The same logic for notify is in ObjectMonitory. CPP
void ObjectMonitor::notify(TRAPS) {
CHECK_OWNER(a);// If waitSet is empty, return directly
if (_WaitSet == NULL) {
TEVENT(Empty-Notify);
return;
}
DTRACE_MONITOR_PROBE(notify, this.object(), THREAD);
// Wake up a thread
INotify(THREAD);
OM_PERFDATA_OP(Notifications, inc(1));
}
Copy the code
In Notify, we first check whether waitSet is empty. If it is empty, no thread is waiting, and return directly. Otherwise, the INotify method is called.
NotifyAll actually calls INotify in a loop
void ObjectMonitor::INotify(Thread * Self) {
// Notify requires a lock to ensure concurrency security
Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify");
// Remove and return the first element in WaitSet, e.g. 1 <--> 2<-->3 in WaitSet, now return 1, and then WaitSet becomes 2<-->3
ObjectWaiter * iterator = DequeueWaiter(a);if(iterator ! =NULL) {
// Disposition - what might we do with iterator ?
// a. add it directly to the EntryList - either tail (policy == 1)
// or head (policy == 0).
// b. push it onto the front of the _cxq (policy == 2).
// For now we use (b).
// Set the thread state
iterator->TState = ObjectWaiter::TS_ENTER;
iterator->_notified = 1;
iterator->_notifier_tid = JFR_THREAD_ID(Self);
ObjectWaiter * list = _EntryList;
if(list ! =NULL) {
assert(list->_prev == NULL."invariant");
assert(list->TState == ObjectWaiter::TS_ENTER, "invariant");
assert(list ! = iterator,"invariant");
}
// prepend to cxq
if (list == NULL) {
iterator->_next = iterator->_prev = NULL;
_EntryList = iterator;
} else {
iterator->TState = ObjectWaiter::TS_CXQ;
for (;;) {
// Place the node to be woken up in the header of the CXQ
ObjectWaiter * front = _cxq;
iterator->_next = front;
if (Atomic::cmpxchg(iterator, &_cxq, front) == front) {
break;
}
}
}
iterator->wait_reenter_begin(this);
}
// notify releases the waitSet lock after completion. Note that the lock is not released by the thread
Thread::SpinRelease(&_WaitSetLock);
}
Copy the code
The notify logic is simple: remove the head of WaitSet from the queue, place the outgoing node into the EntryList if EntryList is empty, and insert the node into the head node of the CXQ list if EntryList is not empty.
Note that notify does not release the lock; the logic for releasing the lock is in exit
exit
After a thread has successfully acquired the object lock, it can execute the custom synchronized code block. The exit function of ObjectMonitor is executed to release the current object lock so that the next thread can acquire the lock.
void ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * const Self = THREAD;
if(THREAD ! = _owner) {// The lock is held by the current thread
if (THREAD->is_lock_owned((address) _owner)) {
assert(_recursions == 0."invariant");
_owner = THREAD;
_recursions = 0;
} else {
assert(false."Non-balanced monitor enter/exit! Likely JNI locking");
return; }}// Reentrant times minus 1
if(_recursions ! =0) {
_recursions--; // this is simple recursive enter
return;
}
for(;;) {... w = _EntryList;// If entryList is not empty, it will
if(w ! =NULL) {
assert(w->TState == ObjectWaiter::TS_ENTER, "invariant");
// execute unpark to release the lock
ExitEpilog(Self, w);
return; } w = _cxq; . _EntryList = w; ObjectWaiter * q =NULL;
ObjectWaiter * p;
// This changes _cxq or _EntryList from a one-way list to a two-way list
for(p = w; p ! =NULL; p = p->_next) {
guarantee(p->TState == ObjectWaiter::TS_CXQ, "Invariant");
p->TState = ObjectWaiter::TS_ENTER;
p->_prev = q;
q = p;
}
w = _EntryList;
if(w ! =NULL) {
guarantee(w->TState == ObjectWaiter::TS_ENTER, "invariant");
// execute unpark to release the lock
ExitEpilog(Self, w);
return; }... }... }void ObjectMonitor::ExitEpilog(Thread * Self, ObjectWaiter * Wakee) {
// Exit protocol:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = NULL
// 3. unpark(wakee)
_succ = Wakee->_thread;
ParkEvent * Trigger = Wakee->_event;
Wakee = NULL;
// Drop the lock
OrderAccess::release_store(&_owner, (void*)NULL);
OrderAccess::fence(a); ./ / releases the lock
Trigger->unpark(a); }Copy the code
The logic of exit is relatively simple
-
If the current thread relinquishes the lock, check whether the reentrant count is 0, subtract 1 from the reentrant count if it is not 0, and exit.
-
If EntryList is not empty, the thread in the EntryList header element is awakened
-
Assign the CXQ pointer to EntryList, loop the CXQ list into a bidirectional list, and then call ExitEpilog to wake up the CXQ list’s head (actually via pthread_cond_signal).
From here,EntryList and CXQ are the same, because CXQ is assigned to EntryList.
Note that the awakened thread will continue to execute the EnterI method at the beginning of this article, which will remove the ObjectWaiter from EntryList or CXQ.
Practical demonstration
The source code above is all based on JDK12. The code in JDK8 has other policies for exit and notify (which thread to choose), and only the default policy has been retained since JDK9.
So the following Java code runs with the same result in JDK8 or JDK12.
Object lock = new Object();
Thread t1 = new Thread(() -> {
System.out.println("Thread 1 start!!!!!!");
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
}
System.out.println("Thread 1 end!!!!!!"); }}); Thread t2 =new Thread(() -> {
System.out.println("Thread 2 start!!!!!!");
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
}
System.out.println("Thread 2 end!!!!!!"); }}); Thread t3 =new Thread(() -> {
System.out.println("Thread 3 start!!!!!!");
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
}
System.out.println("Thread 3 end!!!!!!"); }}); Thread t4 =new Thread(() -> {
System.out.println("Thread 4 start!!!!!!");
synchronized (lock) {
try {
System.in.read();
} catch (Exception e) {
}
lock.notify();
lock.notify();
lock.notify();
System.out.println("Thread 4 end!!!!!!"); }}); Thread t5 =new Thread(() -> {
System.out.println("Thread 5 start!!!!!!");
synchronized (lock) {
System.out.println("Thread 5 end!!!!!!"); }}); Thread t6 =new Thread(() -> {
System.out.println("Thread 6 start!!!!!!");
synchronized (lock) {
System.out.println("Thread 6 end!!!!!!"); }}); Thread t7 =new Thread(() -> {
System.out.println("Thread 7 start!!!!!!");
synchronized (lock) {
System.out.println("Thread 7 end!!!!!!"); }}); t1.start(); sleep_1_second(); t2.start(); sleep_1_second(); t3.start(); sleep_1_second(); t4.start(); sleep_1_second(); t5.start(); sleep_1_second(); t6.start(); sleep_1_second(); t7.start();Copy the code
The above code is very simple, let’s analyze it.
Threads 1,2, and 3 both call WAIT, so they block, and the structure of WaitSet is as follows:
Thread 4 has acquired the lock and is waiting for an input
Threads 5,6, and 7 are also waiting for locks, so they block too, so the CXQ list structure is as follows:
When thread 4 enters anything and press Enter (three notify methods are called, but the lock is not released)
After thread 4 relinquish the lock, thread 1 in EntryList is woken up because EntryList is not empty, and thread 1 in THE CXQ queue is woken up next.
Therefore, the final thread execution order is 4, 1, 3, 2, 7, 6, 5. Our output results can also verify our conclusion
Thread 1 start!!!!!!
Thread 2 start!!!!!!
Thread 3 start!!!!!!
Thread 4 start!!!!!!
Thread 5 start!!!!!!
Thread 6 start!!!!!!
Thread 7 start!!!!!!
think123
Thread 4 end!!!!!!
Thread 1 end!!!!!!
Thread 3 end!!!!!!
Thread 2 end!!!!!!
Thread 7 end!!!!!!
Thread 6 end!!!!!!
Thread 5 end!!!!!!
Copy the code
A picture is worth a thousand words
Your attention is the biggest support for me
Just give it a thumbs up and follow it