LockSupport

Is a new JUC utility class in JSR 166. The LockSupport class is primarily used to create locks and other synchronization classes for thread blocking. This class is associated with each thread it uses and park immediately if available, which we can wake up with the unpack method.

Park method analysis

Disables the current thread for thread scheduling purposes unless a license is available. If the license is available, the license is consumed and the call is returned immediately; Otherwise, for thread scheduling purposes, the current thread will be disabled and put to sleep until one of the following three conditions occurs:

  • Other threads call unpark targeting the current thread;
  • Or another thread interrupts the current thread;
  • Or the call returns incorrectly (that is, without a reason).

This method does not report what caused the method to return. The caller should first recheck the condition that caused the thread to stop. The caller can also determine the interrupted status of the thread upon return.

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false.0L);
    setBlocker(t, null);
}
Copy the code

Unpark method analysis

Make the license for the given thread available if it is not already available. If a thread is blocked on park, it will be unblocked. Otherwise, its next call to Park is guaranteed not to be blocked. If the given thread has not been started, there is no guarantee that this operation will have any effect.

public static void unpark(Thread thread) {
    if(thread ! =null)
        UNSAFE.unpark(thread);
}
Copy the code

JVM source code Analysis

Park method source code analysis

Unsafa (); Unsafa (); unpark (); Unsafa ();

UNSAFE_ENTRY(void.Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
  UnsafeWrapper("Unsafe_Park");
  EventThreadPark event;
#ifndef USDT2
  HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
   HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
  JavaThreadParkedState jtps(thread, time ! =0);
  thread->parker() - >park(isAbsolute ! =0, time);
#ifndef USDT2
  HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
  HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
#endif /* USDT2 */
  if (event.should_commit()) {
    oop obj = thread->current_park_blocker(a); event.set_klass((obj ! =NULL)? obj->klass() : NULL);
    event.set_timeout(time);
    event.set_address((obj ! =NULL)? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
    event.commit(a); }UNSAFE_END

UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
  UnsafeWrapper("Unsafe_Unpark");
  Parker* p = NULL;
  if(jthread ! =NULL) {
    oop java_thread = JNIHandles::resolve_non_null(jthread);
    if(java_thread ! =NULL) {
      jlong lp = java_lang_Thread::park_event(java_thread);
      if(lp ! =0) {
        // This cast is OK even though the jlong might have been read
        // non-atomically on 32bit systems, since there, one word will
        // always be zero anyway and the value set is always the same
        p = (Parker*)addr_from_java(lp);
      } else {
        // Grab lock if apparently null or using older version of library
        MutexLocker mu(Threads_lock);
        java_thread = JNIHandles::resolve_non_null(jthread);
        if(java_thread ! =NULL) {
          JavaThread* thr = java_lang_Thread::thread(java_thread);
          if(thr ! =NULL) {
            p = thr->parker(a);if(p ! =NULL) { // Bind to Java thread for next time.
              java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
            }
          }
        }
      }
    }
  }
  if(p ! =NULL) {
#ifndef USDT2
    HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
    HOTSPOT_THREAD_UNPARK((uintptr_t) p);
#endif /* USDT2 */
    p->unpark(a); } UNSAFE_ENDCopy the code

Each thread object has a Parker instance (source: thread.hpp)

// JSR166 per-thread parker
private:
  Parker*    _parker;
public:
  Parker*     parker(a) { return _parker; }
Copy the code

The Parker class is defined as follows (source file park.hpp).

  1. The Parker class inherits FROM OS ::PlatformParker. It should be an adaptation for different operating systems
  2. There is a _counter property, which can be interpreted as a license to call the park method, only if _count > 0;
  3. Public park and unpark methods are provided
class Parker : public os::PlatformParker {
private:
  volatile int _counter ;
  Parker * FreeNext ;
  JavaThread * AssociatedWith ; // Current association

public:
  Parker() : PlatformParker() {
    _counter       = 0 ;
    FreeNext       = NULL ;
    AssociatedWith = NULL ;
  }
protected:
  ~Parker() { ShouldNotReachHere(); }
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  void park(bool isAbsolute, jlong time);
  void unpark(a);

  // Lifecycle operators
  static Parker * Allocate (JavaThread * t) ;
  static void Release (Parker * e) ;
private:
  static Parker * volatile FreeList ;
  static volatile int ListLock ;

};
Copy the code

Parker’s parent is PlatformParker (os_linux.cpp). We can look at his implementation of Parker’s Park method in Linunx:

  1. If _counter >0, no need to wait, return _counter = 0
  2. If 1 is not true, construct ThreadBlockInVM for the current thread, check if _counter > 0 is true, set _counter to 0, unlock mutex returns;
  3. If 2 is not true, more time is needed to wait for different functions. If the wait returns correctly, then _counter is set to 0, unlock mutex, park is called successfully.
void Parker::park(bool isAbsolute, jlong time) {
  // Ideally we'd do something useful while spinning, such
  // as calling unpackTime().

  // Optional fast-path check:
  // Return immediately if a permit is available.
  // We depend on Atomic::xchg() having full barrier semantics
  // since we are doing a lock-free update to _counter.
  if (Atomic::xchg(0, &_counter) > 0) return; // _counter > 0, use XCHG to change to 0 and return

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

  // Optional optimization -- avoid state transitions if there's an interrupt pending.
  // Check interrupt before trying to wait
  if (Thread::is_interrupted(thread, false)) { // If the thread is interrupted, return directly
    return;
  }

  // Next, demultiplex/decode time arguments
  timespec absTime;
  if (time < 0 || (isAbsolute && time == 0)) {// don't wait at all
    return;
  }
  if (time > 0) {
    unpackTime(&absTime, isAbsolute, time);
  }


  // Enter safepoint region
  // Beware of deadlocks such as 6317397.
  // The per-thread Parker:: mutex is a classic leaf-lock.
  // In particular a thread must never block on the Threads_lock while
  // holding the Parker:: mutex. If safepoints are pending both the
  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
  ThreadBlockInVM tbivm(jt); // Construct ThreadBlockInVM for the current thread to prevent special scenarios such as deadlocks

  // Don't wait if cannot get lock since interference arises from
  // unblocking. Also. check interrupt before trying wait
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) ! =0) {
    return;
  }

  int status ;
  if (_counter > 0)  { // no wait needed
    _counter = 0; / / reset
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0."invariant");// Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other and Java-level accesses.
    OrderAccess::fence();
    return;
  }

#ifdef ASSERT
  // Don't catch signals while blocked; let the running threads have the signals.
  // (This allows a debugger to break into the running thread.)
  sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif

  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  jt->set_suspend_equivalent();
  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

  assert(_cur_index == - 1."invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
  } else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    // call safe_cond_timedWait to block the thread
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
    if(status ! =0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (&_cond[_cur_index]) ;
      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
    }
  }
  _cur_index = - 1;
  assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");

#ifdef ASSERT
  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif

  _counter = 0 ; // The _counter status bit is reset after return
  status = pthread_mutex_unlock(_mutex) ;
  assert_status(status == 0, status, "invariant");// Paranoia to ensure our locked and lock-free paths interact
  // correctly with each other and Java-level accesses.
  OrderAccess::fence();

  // If externally suspended while waiting, re-suspend
  if(jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); }}Copy the code

Unpark method source code analysis

Take a look at unpark (os_linux.cpp). The main process of Xi ‘an is as follows:

  1. Pthread_mutex_lock acquiring a lock
  2. _counter is set to 1
  3. Determine the old value of _counter:
  • When less than 1, call pthread_cond_signal to wake up the thread blocking in park.
  • Equal to 1, return directly
void Parker::unpark() {
  int s, status ;
  status = pthread_mutex_lock(_mutex);
  assert (status == 0."invariant"); s = _counter; _counter =1;
  if (s < 1) {
    // thread might be parked
    if(_cur_index ! = -1) {
      // thread is definitely parked
      if (WorkAroundNPTLTimedWaitHang) {
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0."invariant");
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0."invariant");
      } else {
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0."invariant");
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0."invariant"); }}else {
      pthread_mutex_unlock(_mutex);
      assert (status == 0."invariant"); }}else {
    pthread_mutex_unlock(_mutex);
    assert (status == 0."invariant"); }}Copy the code

The difference between Parker and ParkEvent

ParkEvent is similar to Parker in that it can also block and wake up threads. ParkEvent is the Java-level synchronize keyword. Parker is a set of concurrent tools from JSR166. ParkerEvent inherits PlatformEvent. The base class PlatformEvent is platform-specific, while ParkEvent is platform-independent. Parker is an inheritance from PlatformParker.

  1. The park and unpark methods in ParkerEvent are used to implement Java object.wait() and Object.notify () methods.
  2. Locksupprt.park() and locksupprt.unpark ();

Refer to the source code comment for park. HPP

// The base-class, PlatformEvent, is platform-specific while the ParkEvent is
// platform-independent. PlatformEvent provides park(), unpark(), etc., and
// is abstract -- that is, a PlatformEvent should never be instantiated except
// as part of a ParkEvent.
// Equivalently we could have defined a platform-independent base-class that
// exported Allocate(), Release(), etc. The platform-specific class would extend
// that base-class, adding park(), unpark(), etc.
//
// A word of caution: The JVM uses 2 very similar constructs:
// 1. ParkEvent are used for Java-level "monitor" synchronization.
// 2. Parkers are used by JSR166-JUC park-unpark.
//
// We'll want to eventually merge these redundant facilities and use ParkEvent.
Copy the code

Use case

Here is the use of a fifO non-reentrant method.

class FIFOMutex {
    private final AtomicBoolean locked = new AtomicBoolean(false);
    private final Queue<Thread> waiters
        = new ConcurrentLinkedQueue<Thread>();

    public void lock(a) {
        boolean wasInterrupted = false;
        Thread current = Thread.currentThread();
        waiters.add(current);

        // Block while not first in queue or cannot acquire lock
        while(waiters.peek() ! = current || ! locked.compareAndSet(false.true)) {
            LockSupport.park(this);
            if (Thread.interrupted()) // ignore interrupts while waiting
                wasInterrupted = true;
        }

        waiters.remove();
        if (wasInterrupted)          // reassert interrupt status on exit
            current.interrupt();
    }

    public void unlock(a) {
        locked.set(false); LockSupport.unpark(waiters.peek()); }}}Copy the code

Reference documentation

  • Docs.oracle.com/javase/7/do…
  • Docs.oracle.com/javase/spec…
  • Blog.csdn.net/qq_26222859…
  • Segmentfault.com/a/119000000…
  • www.jianshu.com/p/ceb8870ef…
  • Blog.csdn.net/chengzhang1…
  • Blog.csdn.net/qq_31865983…