LockSupport is the basic thread-blocking primitive used to create locks, such as the AQS method for thread suspension, park, and unpark for wake up. The JDK uses the following

park

There is only one license, not cumulative

Park source tracing

Park’s statement takes two forms

The park operation with Object is recommended

Function of Park

Park is used to suspend the current thread, return immediately if permissions are available, and consume them.

  • Park (Object): the recovery condition is 1: the thread calls unpark. 2: Another thread interrupts the thread. 3: Something unexpected happens
  • ParkNanos (Object Blocker, Long Nanos): The recovery condition is 1: the thread called unpark; 2: Another thread interrupts the thread. 3. Something unexpected happens. 4: The expiration time is up
  • ParkUntil (Object Blocker, long Deadline): The recovery condition is 1: the thread called unpark; 2: Another thread interrupts the thread. 3. Something unexpected happens. 4: The specified deadLine has arrived. Take Park’s source code for example
    Public static void park(Object blocker) {public static void park(Object blocker) {t = thread.currentThread (); // unsafe.putObject stores objectssetBlocker(t, blocker); // execute park unsafe. Park (false, 0L); // Remove the blocking cause after the thread recoverssetBlocker(t, null);
    }
    Copy the code

Looking at the source code, you can see that the real implementation is in Unsafe

unsafe.park

The core implementation is as follows

JavaThread* thread=JavaThread::thread_from_jni_environment(env); . thread->parker()->park(isAbsolute ! = 0, time);Copy the code

Get the Java thread’s Parker object and execute its Park method. Parker’s definition is as follows

Class Parker: public OS ::PlatformParker {private: // volatile int _counter; Parker * FreeNext ; JavaThread * AssociatedWith ; // Current association public: Parker() :PlatformParker() {// initialize _counter _counter = 0; FreeNext = NULL ; AssociatedWith = NULL ; } protected: ~Parker() { ShouldNotReachHere(); }
public:
  void park(bool isAbsolute, jlong time);
  void unpark();

  // Lifecycle operators  
  static Parker * Allocate (JavaThread * t) ;
  static void Release (Parker * e) ;
private:
  static Parker * volatile FreeList ;
  static volatile int ListLock ;

};
Copy the code

It inherits OS ::PlatformParker with volatitle _counter built in. PlatformParker is implemented differently on different operating systems, such as Linux

Class PlatformParker: public CHeapObj {protected: // pthread_mutex_t _mutex [1]; // Conditional variable type pthread_cond_t _cond [1]; public: ~PlatformParker() { guarantee (0, "invariant"); } public:PlatformParker() { int status; Status = pthread_cond_init (_cond, NULL); pthread_cond_init (_cond, NULL); assert_status(status == 0, status,"Cond_init"); Status = pthread_mutex_init (_mutex, NULL); status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); }}Copy the code

The above code is used by THE POSIX thread interface, so pthread refers to posixThread

Parker implements the following

void Parker::park(bool isAbsolute, jlong time) {
  if(_counter > 0) {// There is already a license, use the current license _counter = 0; OrderAccess::fence(); OrderAccess::fence(); // Return immediatelyreturn ;
  }

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

 if (Thread::is_interrupted(thread, false) {// The thread has performed an interrupt and returnsreturn;
  }

  if(time < 0 | | (isAbsolute && time = = 0)) {/ / time, or on behalf of absolute time and absolute time at the same time is 0 (at this point and time), direct return, in Java parkUtil travels is absolute time, others are notreturn;
  }
  if(time > 0) {// The time parameter is passed, stored in absTime and parsed into absTime-> TV_sec (seconds) and absTime->tv_nsec(nanoseconds), stored, UnpackTime (&absTime, isAbsolute, time); } // Enter safepoint region and change the thread to block state ThreadBlockInVM tbivm(jt);if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) ! Return if the thread is interrupted, or if the mutex fails to be locked, for example by another threadreturn; } // thread mutex lock is successful;if(_counter > 0) {return _counter = 0; Status = pthread_mutex_unlock(_mutex); assert (status == 0,"invariant"); OrderAccess::fence();return;
  }

#ifdef ASSERT
  // Don't catch signals while blocked; Let the running threads have the signal. // (This allows a debugger to break into the running thread. sigset_t oldsigs; sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals(); pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs); #endif // Set the operating system thread owned by the Java thread to CONDVAR_WAIT, OSThreadWaitState osts(thread-> osThread (), false /* Not object.wait () */); // set the Java _suspend_equivalent parameter to true jt->set_suspend_equivalent(); // cleared by handle_SPECIAL_suspend_equivalent_condition () or java_suspend_self() if (time == 0) { The mutex variable is then unlocked (these are atomic operations), at which point the thread enters the wait, and when it returns, the mutex variable is locked again. Status = pthread_cond_wait (_cond, _mutex); } else {// same as pthread_cond_wait, but with an extra timeout, if the timeout condition is not present, ETIMEDOUT status = OS ::Linux:: safe_cond_timedWait (_cond, _mutex, &absTime); if (status ! = 0 && WorkAroundNPTLTimedWaitHang) {/ / WorkAroundNPTLTimedWaitHang is the JVM running parameters, Default is 1 // remove initialization pthread_cond_destroy (_cond); // reinitialize pthread_cond_init (_cond, NULL); } } assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait"); #ifdef ASSERT pthread_sigmask(SIG_SETMASK, &oldsigs, NULL); _counter = 0; // Release the mutex lock status = pthread_mutex_unlock(_mutex); assert_status(status == 0, status, "invariant") ; // If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } // Add the memory barrier directive OrderAccess::fence(); }Copy the code

You can see this from the implementation of Park

  1. In any case, the Park method itself does not tell the caller why it returned, so the call is usually made to determine the returned scenario and do different things according to the scenario. Okay
  2. Threads wait and suspend, wake up, and so on are the POSIX thread apis used
  3. Park’s permissions are implemented through the atomic variable _count, which is 0 when consumed and is returned immediately as long as the permissions are owned

OrderAccess::fence();

In Linux, the implementation is as follows


inline void OrderAccess::fence() {
  if (os::is_MP()) {
#ifdef AMD64// Mfence is not used, because mfence is sometimes worse than locked addl __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc"."memory");
#else __asm__ volatile ("lock; Addl $0,0(%%esp)" : : : "cc", "memory");
#endif }
}
Copy the code

Validation on memory reordering network

ThreadBlockInVM tbivm(jt)

This is the syntax for creating new variables in C++, where the constructor is called to create a new variable named tbivm and the parameter jt. The implementation of the class is

class ThreadBlockInVM : public ThreadStateTransition { public: ThreadBlockInVM(JavaThread *thread) : ThreadStateTransition(thread) { // Once we are blocked vm expects stack to be walkable thread->frame_anchor()->make_walkable(thread); Trans_and_fence (_thread_in_VM, _thread_blocked); }... };Copy the code

_thread_IN_VM indicates that the thread is currently executing in the VM, and _thread_blocked indicates that the thread is currently blocked, as enumerated in globalDefinitions. HPP

// This enumeration is used to track which part of the code the thread is executing in. It is used for SafePoint code and has four important types: _thread_new/_thread_in_native/ _thread_in_VM /_thread_in_Java. States such as xxx_trans are intermediate states, indicating that a thread is changing from one state to another. This allows SafePoint Code to handle thread states without suspending them, making Safe Point Code run faster. Given a state, JavaThreadState {_thread_uninitialized = 0, // should never happen (missing initialization) _thread_new = 2, // just starting up, i.e.,in process of being initialized 
_thread_new_trans         =  3, // corresponding transition state (not used, included for completeness)  
_thread_in_native         =  4, // running in native code  . This is a safepoint region, since all oops will be in jobject handles
_thread_in_native_trans   =  5, // corresponding transition state  
_thread_in_vm             =  6, // running in VM 
_thread_in_vm_trans       =  7, // corresponding transition state 
_thread_in_Java           =  8, //  Executing either interpreted or compiled Java code running in Java or in stub code  
_thread_in_Java_trans     =  9, // corresponding transition state (not used, included for completeness) 
_thread_blocked           = 10, // blocked in vm 
_thread_blocked_trans     = 11, // corresponding transition state 
_thread_max_state         = 12  // maximum thread state+1 - used for statistics allocation
};
Copy the code

The parent class ThreadStateTransition defines trans_and_fence as follows

void trans_and_fence(JavaThreadState from, JavaThreadState to) { transition_and_fence(_thread, from, to); } // transition_and_fence must be used on any thread state transitionwhere there might not be a Java call stub on the stack, in
// particular on Windows where the Structured Exception Handler is
// set up in the call stub. os::write_memory_serialize_page() can
// fault and we can't recover from it on Windows without a SEH in // place. // The transition_and_fence method must use static inline void when any thread state transitions transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) { assert(thread->thread_state() == from, "coming from wrong thread state"); assert((from & 1) == 0 && (to & 1) == 0, "odd numbers are transitions states"); Thread_set_state ((JavaThreadState)(from + 1)); // Set the memory barrier, Ensure that the new state is visible to the VM thread if (OS ::is_MP()) {if (UseMembar) {// Force a fence between the write above and read below OrderAccess::fence(); } else { // Must use this rather than serialization page in particular on Windows InterfaceSupport::serialize_memory(thread); } } if (SafepointSynchronize::do_call_back()) { SafepointSynchronize::block(thread); } thread->set_thread_state(to); CHECK_UNHANDLED_OOPS_ONLY(thread->clear_unhandled_oops();) }Copy the code

Common value of operating system thread status

OsThread gives an approximate value for the operating system thread state, which itself is platform-dependent

enum ThreadState {
 ALLOCATED,                    // Memory has been allocated but not initialized  
INITIALIZED,                  // The thread has been initialized but yet started 
RUNNABLE,                     // Has been started and is runnable, but not necessarily running  
MONITOR_WAIT,                 // Waiting on a contended monitor lock  
CONDVAR_WAIT,                 // Waiting on a condition variable  
OBJECT_WAIT,                  // Waiting on an Object.wait() call  
BREAKPOINTED,                 // Suspended at breakpoint  
SLEEPING,                     // Thread.sleep()  
ZOMBIE                        // All done, but not reclaimed yet
};
Copy the code

Unpark source tracking

To achieve the following

void Parker::unpark() { int s, status ; // Lock the mutex. If the mutex is already locked, block until the mutex is unlockedwaitStatus = pthread_mutex_lock(_mutex); assert (status == 0,"invariant"); // Store old _counter s = _counter; // Permissions are changed to 1, and each call is set to grant permissions _counter = 1;if(s < 1) {// There was no permission beforeif(WorkAroundNPTLTimedWaitHang) {/ / the default execution, signaling that conditions have been met, the thread that is waiting to awaken the status = pthread_cond_signal (_cond); assert (status == 0,"invariant"); Status = pthread_mutex_unlock(_mutex); assert (status == 0,"invariant"); }else {
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant"); status = pthread_cond_signal (_cond) ; assert (status == 0,"invariant"); }}else{// Pthread_mutex_unlock (_mutex); // Park returns pthread_mutex_unlock(_mutex); assert (status == 0,"invariant"); }}Copy the code

Unpark itself issues the license and notifies the waiting thread that it is ready to end the wait

conclusion

  • Park /unpark can accurately wake up and wait for threads.
  • Implementation on Linux is implemented through POSIX’s thread API wait, wake up, mutexes, and conditions
  • Park’s preference is to check if there is a permission and return immediately if there is a permission, while unpark sets the permission to yes every time. This means that the user can execute unpark first, grant permission, and then park itself immediately. This applies to scenarios where the producer is fast and the consumer is not yet finished