Have you ever wondered what happens to the threads that apply for locks? Some may acquire a lock and execute the business code immediately. But if a lock is required by many threads, how are those threads handled?

Today we go into synchronized heavyweight locks and see what happens to threads that don’t acquire locks.

Ps: If you don’t want to see the results of the analysis, you can drag it to the end, there is a summary graph at the end, a picture is worth a thousand words

Previous articles have looked at lock optimizations in Synchroinzed, but if there is a lot of competition, they all end up being heavyweight locks. So let’s start by looking directly at the heavyweight lock code.

To apply for the lock

In the ObjectMonitor:: Enter function, there is a lot of logic to judge and optimize execution, but the core is actually entering the queue via the EnterI function, which blocks the current thread

void ObjectMonitor::EnterI(TRAPS) {
  Thread * const Self = THREAD;

  // CAS attempts to set the current thread to the thread holding the lock
  if (TryLock (Self) > 0) {
    assert(_succ ! = Self,"invariant");
    assert(_owner == Self, "invariant");
    assert(_Responsible ! = Self,"invariant");
    return;
  }

  // Call tryLock with spin to try again, and the operating system expects some subtle effects
  if (TrySpin(Self) > 0) {
    assert(_owner == Self, "invariant");
    assert(_succ ! = Self,"invariant");
    assert(_Responsible ! = Self,"invariant");
    return; }...// Build the current thread into ObjectWaiter
  ObjectWaiter node(Self);
  Self->_ParkEvent->reset(a); node._prev = (ObjectWaiter *)0xBAD;
  node.TState  = ObjectWaiter::TS_CXQ;


  ObjectWaiter * nxt;
  for (;;) {
    // Insert the ObjectWaiter object into the CXQ queue header using CAS
    node._next = nxt = _cxq;
    if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;

    // CAS failed due to CXQ change. TryLock again
    if (TryLock (Self) > 0) {
      assert(_succ ! = Self,"invariant");
      assert(_owner == Self, "invariant");
      assert(_Responsible ! = Self,"invariant");
      return; }}// Block the current thread
  for (;;) {
    if (TryLock(Self) > 0) break;
    assert(_owner ! = Self,"invariant");

    // park self
    if (_Responsible == Self) {
      Self->_ParkEvent->park((jlong) recheckInterval);
      recheckInterval *= 8;
      if(recheckInterval > MAX_RECHECK_INTERVAL) { recheckInterval = MAX_RECHECK_INTERVAL; }}else {
      Self->_ParkEvent->park();
    }
    ...
    
    if (TryLock(Self) > 0) break;

    ++nWakeups;

    if (TrySpin(Self) > 0) break; . }...Self has acquired the lock and needs to be removed from CXQ or EntryList
  UnlinkAfterAcquire(Self, &node); . }Copy the code
  1. Before enqueueing, tryLock is called to try to set the _OWNER (thread pointer held by the current ObjectMonitor object lock) field to Self(pointing to the currently executing thread) via the CAS operation. If this is set successfully, the current thread has acquired the lock, otherwise it has not.
int ObjectMonitor::TryLock(Thread * Self) {
  void * own = _owner;
  if(own ! =NULL) return 0;
  if (Atomic::replace_if_null(Self, &_owner)) {
    return 1;
  }
  return - 1;
}
Copy the code
  1. If tryLock does not succeed, tryLock will be called again (tryLock is called in trySpin) to try to get the lock, because this will tell the operating system that I desperately need the resource and want it allocated to me as much as possible. But this affinity is not a guaranteed agreement, just a positive one.

  2. The current thread is wrapped in an ObjectWaiter object into the head of the CXQ queue

  3. Block the current thread (via pthread_cond_wait)

  4. When the thread is awakened and has acquired the lock, the UnlinkAfterAcquire method is called to remove the ObjectWaiter from either CXQ or EntryList

Core data structure

The ObjectMonitor object holds a queue of sychronized blocked threads and implements different queue scheduling strategies, so we must first recognize some important properties of this object

class ObjectMonitor {

  // mark word
  volatile markOop _header;

  // pointer to owning thread or BasicLock
  void * volatile _owner; 

  // The thread ID of the previous owner of monitor
  volatile jlong _previous_owner_tid;

  // The reentrant count is 0 for the first time
  volatile intptr_t _recursions;

  // The next thread to wake up
  Thread * volatile _succ;

  // A list of threads blocked on entry or re-entry, consisting of ObjectWaiter, which acts as a wrapper object for the thread
  ObjectWaiter * volatile _EntryList;

  // The CXQ queue stores threads that cannot enter because the lock has been blocked by another thread
  ObjectWaiter * volatile _cxq;

  // Threads in wait state (wait()) are added to waitSet
  ObjectWaiter * volatile _WaitSet;

  // Omit other attributes and methods

}

class ObjectWaiter : public StackObj {
 public:
  enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ };

  // The last node
  ObjectWaiter * volatile _next;

  // The previous node
  ObjectWaiter * volatile _prev;

  / / thread
  Thread*       _thread;
  // Thread status
  volatile TStates TState;
 public:
  ObjectWaiter(Thread* thread);
};

Copy the code

If you look at _next and _prev in ObjectWaiter, you can see that this is a two-way queue implementation, but actually our enqueue operation above does not form a two-way list, it forms a two-way list during the exit lock.

wait

Java Object class provides a communication method between wait and notify threads based on native implementation. In JDK, wait/notify/notifyAll is implemented through Native implementation. Of course, in JVM, Its implementation is still in the SRC/hotspot/share/runtime/objectMonitor CPP.

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
 
  Thread * constSelf = THREAD; JavaThread *jt = (JavaThread *)THREAD; .If the thread is interrupted, an exception needs to be thrown
  if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
    THROW(vmSymbols::java_lang_InterruptedException());
    return;
  }
  
  jt->set_current_waiting_monitor(this);

  // Construct the ObjectWaiter node
  ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT; .// Add ObjectWaiter to the end of WaitSet
  AddWaiter(&node);

  / / release the lock
  exit(true, Self); .// Investigate park() and block the current thread
  if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
        // Intentionally empty
  } else if (node._notified == 0) {
    if (millis <= 0) {
      Self->_ParkEvent->park(a); }else {
      ret = Self->_ParkEvent->park(millis); }}... }// Insert node at the end of the two-way list _WaitSet
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet;
    ObjectWaiter* tail = head->_prev;
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
Copy the code

Above I have listed the main method logic of wait, which mainly performs the following steps

  1. Determine whether the current thread is interrupted and throw InterruptedException if it is interrupted
  2. If it is not interrupted, the ObjectWaiter node is constructed using the current thread and inserted at the end of WaitSet
  3. Call exit to relinquish the lock (the logic of relinquishing the lock is discussed below)
  4. The call to park(actually pthread_cond_wait) blocks the current thread

notify

The same logic for notify is in ObjectMonitory. CPP

void ObjectMonitor::notify(TRAPS) {
  CHECK_OWNER(a);// If waitSet is empty, return directly
  if (_WaitSet == NULL) {
    TEVENT(Empty-Notify);
    return;
  }
  DTRACE_MONITOR_PROBE(notify, this.object(), THREAD);
  
  // Wake up a thread
  INotify(THREAD);

  OM_PERFDATA_OP(Notifications, inc(1));
}

Copy the code

In Notify, we first check whether waitSet is empty. If it is empty, no thread is waiting, and return directly. Otherwise, the INotify method is called.

NotifyAll actually calls INotify in a loop

void ObjectMonitor::INotify(Thread * Self) {

  // Notify requires a lock to ensure concurrency security
  Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify");

  // Remove and return the first element in WaitSet, e.g. 1 <--> 2<-->3 in WaitSet, now return 1, and then WaitSet becomes 2<-->3
  ObjectWaiter * iterator = DequeueWaiter(a);if(iterator ! =NULL) {
   
    // Disposition - what might we do with iterator ?
    // a. add it directly to the EntryList - either tail (policy == 1)
    // or head (policy == 0).
    // b. push it onto the front of the _cxq (policy == 2).
    // For now we use (b).

    // Set the thread state
    iterator->TState = ObjectWaiter::TS_ENTER;

    iterator->_notified = 1;
    iterator->_notifier_tid = JFR_THREAD_ID(Self);

    ObjectWaiter * list = _EntryList;
    if(list ! =NULL) {
      assert(list->_prev == NULL."invariant");
      assert(list->TState == ObjectWaiter::TS_ENTER, "invariant");
      assert(list ! = iterator,"invariant");
    }

    // prepend to cxq
    if (list == NULL) {
      iterator->_next = iterator->_prev = NULL;
      _EntryList = iterator;
    } else {
      iterator->TState = ObjectWaiter::TS_CXQ;
      for (;;) {
        // Place the node to be woken up in the header of the CXQ
        ObjectWaiter * front = _cxq;
        iterator->_next = front;
        if (Atomic::cmpxchg(iterator, &_cxq, front) == front) {
          break;
        }
      }
    }

    iterator->wait_reenter_begin(this);
  }

  // notify releases the waitSet lock after completion. Note that the lock is not released by the thread
  Thread::SpinRelease(&_WaitSetLock);
}

Copy the code

The notify logic is simple: remove the head of WaitSet from the queue, place the outgoing node into the EntryList if EntryList is empty, and insert the node into the head node of the CXQ list if EntryList is not empty.

Note that notify does not release the lock; the logic for releasing the lock is in exit

exit

After a thread has successfully acquired the object lock, it can execute the custom synchronized code block. The exit function of ObjectMonitor is executed to release the current object lock so that the next thread can acquire the lock.

void ObjectMonitor::exit(bool not_suspended, TRAPS) {
  Thread * const Self = THREAD;
  if(THREAD ! = _owner) {// The lock is held by the current thread
    if (THREAD->is_lock_owned((address) _owner)) {
      assert(_recursions == 0."invariant");
      _owner = THREAD;
      _recursions = 0;
    } else {
      assert(false."Non-balanced monitor enter/exit! Likely JNI locking");
      return; }}// Reentrant times minus 1
  if(_recursions ! =0) {
    _recursions--;        // this is simple recursive enter
    return;
  }

  for(;;) {... w = _EntryList;// If entryList is not empty, it will
    if(w ! =NULL) {
      assert(w->TState == ObjectWaiter::TS_ENTER, "invariant");
      // execute unpark to release the lock
      ExitEpilog(Self, w);
      return; } w = _cxq; . _EntryList = w; ObjectWaiter * q =NULL;
    ObjectWaiter * p;

    // This changes _cxq or _EntryList from a one-way list to a two-way list
    for(p = w; p ! =NULL; p = p->_next) {
      guarantee(p->TState == ObjectWaiter::TS_CXQ, "Invariant");
      p->TState = ObjectWaiter::TS_ENTER;
      p->_prev = q;
      q = p;
    }
    w = _EntryList;
    if(w ! =NULL) {
      guarantee(w->TState == ObjectWaiter::TS_ENTER, "invariant");
      // execute unpark to release the lock
      ExitEpilog(Self, w);
      return; }... }... }void ObjectMonitor::ExitEpilog(Thread * Self, ObjectWaiter * Wakee) {
  // Exit protocol:
  // 1. ST _succ = wakee
  // 2. membar #loadstore|#storestore;
  // 2. ST _owner = NULL
  // 3. unpark(wakee)

  _succ = Wakee->_thread;

  ParkEvent * Trigger = Wakee->_event;

  Wakee  = NULL;

  // Drop the lock
  OrderAccess::release_store(&_owner, (void*)NULL);
  OrderAccess::fence(a); ./ / releases the lock
  Trigger->unpark(a); }Copy the code

The logic of exit is relatively simple

  1. If the current thread relinquishes the lock, check whether the reentrant count is 0, subtract 1 from the reentrant count if it is not 0, and exit.

  2. If EntryList is not empty, the thread in the EntryList header element is awakened

  3. Assign the CXQ pointer to EntryList, loop the CXQ list into a bidirectional list, and then call ExitEpilog to wake up the CXQ list’s head (actually via pthread_cond_signal).

From here,EntryList and CXQ are the same, because CXQ is assigned to EntryList.

Note that the awakened thread will continue to execute the EnterI method at the beginning of this article, which will remove the ObjectWaiter from EntryList or CXQ.

Practical demonstration

The source code above is all based on JDK12. The code in JDK8 has other policies for exit and notify (which thread to choose), and only the default policy has been retained since JDK9.

So the following Java code runs with the same result in JDK8 or JDK12.

Object lock = new Object();

Thread t1 = new Thread(() -> {
  System.out.println("Thread 1 start!!!!!!");
  synchronized (lock) {
    try {
      lock.wait();
    } catch (Exception e) {
    }
    System.out.println("Thread 1 end!!!!!!"); }}); Thread t2 =new Thread(() -> {
  System.out.println("Thread 2 start!!!!!!");
  synchronized (lock) {
      try {
        lock.wait();
      } catch (Exception e) {
      }
      System.out.println("Thread 2 end!!!!!!"); }}); Thread t3 =new Thread(() -> {
  System.out.println("Thread 3 start!!!!!!");
  synchronized (lock) {
      try {
        lock.wait();
      } catch (Exception e) {
      }
      System.out.println("Thread 3 end!!!!!!"); }}); Thread t4 =new Thread(() -> {
  System.out.println("Thread 4 start!!!!!!");
  synchronized (lock) {
    try {
      System.in.read();
    } catch (Exception e) {
    }
    lock.notify();
    lock.notify();
    lock.notify();
    System.out.println("Thread 4 end!!!!!!"); }}); Thread t5 =new Thread(() -> {
  System.out.println("Thread 5 start!!!!!!");
  synchronized (lock) {
      System.out.println("Thread 5 end!!!!!!"); }}); Thread t6 =new Thread(() -> {
  System.out.println("Thread 6 start!!!!!!");
  synchronized (lock) {
      System.out.println("Thread 6 end!!!!!!"); }}); Thread t7 =new Thread(() -> {
  System.out.println("Thread 7 start!!!!!!");
  synchronized (lock) {
      System.out.println("Thread 7 end!!!!!!"); }}); t1.start(); sleep_1_second(); t2.start(); sleep_1_second(); t3.start(); sleep_1_second(); t4.start(); sleep_1_second(); t5.start(); sleep_1_second(); t6.start(); sleep_1_second(); t7.start();Copy the code

The above code is very simple, let’s analyze it.

Threads 1,2, and 3 both call WAIT, so they block, and the structure of WaitSet is as follows:

Thread 4 has acquired the lock and is waiting for an input

Threads 5,6, and 7 are also waiting for locks, so they block too, so the CXQ list structure is as follows:

When thread 4 enters anything and press Enter (three notify methods are called, but the lock is not released)

After thread 4 relinquish the lock, thread 1 in EntryList is woken up because EntryList is not empty, and thread 1 in THE CXQ queue is woken up next.

Therefore, the final thread execution order is 4, 1, 3, 2, 7, 6, 5. Our output results can also verify our conclusion

Thread 1 start!!!!!!
Thread 2 start!!!!!!
Thread 3 start!!!!!!
Thread 4 start!!!!!!
Thread 5 start!!!!!!
Thread 6 start!!!!!!
Thread 7 start!!!!!!
think123
Thread 4 end!!!!!!
Thread 1 end!!!!!!
Thread 3 end!!!!!!
Thread 2 end!!!!!!
Thread 7 end!!!!!!
Thread 6 end!!!!!!
Thread 5 end!!!!!!
Copy the code

A picture is worth a thousand words

Your attention is the biggest support for me

Just give it a thumbs up and follow it