Synchronized locks and JUC locks are concurrent locks in JAVA. Synchronized locks and JUC locks are concurrent locks. Synchronized locks are capabilities provided by the JAVA language level and will not be expanded here. This article focuses on ReentrantLock in JUC.

The author | | ali Jiang Chong source technology public number

Synchronized locks and JUC locks are concurrent locks in JAVA. Synchronized locks and JUC locks are concurrent locks. Synchronized locks are capabilities provided by the JAVA language level and will not be expanded here. This article focuses on ReentrantLock in JUC.

A JDK layer

1 AbstractQueuedSynchronizer

The LOCK (), unlock () apis of ReentrantLock rely on the internal Synchronizer (note, not synchronized). Synchronizer is divided into FairSync and NonfairSync, which, as the name suggests, mean fair and unfair.

When ReentrantLock’s lock method is called, it is simply passed to Synchronizer’s lock () method:

Code excerpt from: java.util.concurrent.locks.ReentrantLock.java /** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { . } public void lock() { sync.lock(); }Copy the code

So what is sync? We see the Sync inherited from AbstractQueueSynchronizer (AQS), AQS is a cornerstone of concurrent bag, AQS itself does not implement any synchronous interface (such as lock, unlock, countDown, etc.), However, it defines a framework for concurrent resource control logic (using the Template Method design pattern), which defines acquire and release methods for acquiring and releasing resources exclusively. And the acquireShared and releaseShared methods to get and release resources in a shared way. Such as acquire/release for implementing already, and acquireShared/releaseShared used to implement CountDownLacth, Semaphore. For example, acquire’s framework is as follows:

/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

The overall logic is to do a tryAcquire, if it succeeds, that’s all, and the caller continues to execute his own code, or if it fails, addWaiter and acquireQueued. Where tryAcquire() requires subclasses to implement according to their own synchronization requirements, acquireQueued() and addWaiter() are already implemented by AQS. AddWaiter adds the current thread to the end of the AQS internal synchronization queue, and acquireQueued blocks the current thread if tryAcquire() fails.

The code for addWaiter is as follows:

/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.shared for SHARED * @return the new Node */ private Node addWaiter(Node mode)  new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // If the last node is not empty, the synchronization queue has been initialized. = null) {node.prev = pred; If (compareAndSetTail(pred, node)) {// The successor of the old tail node is set to the new tail node. So the synchronous queue is a two-way list. pred.next = node; return node; }} // If the tail node is empty, the queue has not been initialized. The head node needs to be initialized and a new node enq(node) needs to be added. return node; }Copy the code

The enq(node) code is as follows:

/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; If (t == null) {// Must initialize // If tail is empty, create a new head node, and both tail and head point to the head node. If (compareAndSetHead(new Node())) tail = head; } else {// Loop through the branch a second time, node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code

After addWaiter completes, the structure of the synchronization queue looks like this:

The code for acquireQueued is as follows:

/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) Final node p = node.predecessor(); If (p == head && tryAcquire(arg)) {if (p == head && tryAcquire(arg)) {if (p == head && tryAcquire(ARg)) {if (p == head && tryAcquire(ARg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

AcquireQueued logic is:

To determine whether it is the first queued node in the synchronization queue, try to lock it, and if successful, turn itself into a head node, as shown below:

If you are not the first line of nodes or tryAcquire fails, call the shouldParkAfterFailedAcquire, its main logic is using the CAS will node status by the INITIAL set to SIGNAL, said that the current thread block waiting for the SIGNAL. If the setup fails, retry in an endless loop in the acquireQueued method until the setup succeeds, and then call the parkAndCheckInterrupt method. ParkAndCheckInterrupt blocks and suspends the current thread until it is woken up. The implementation of parkAndCheckInterrupt relies on the capabilities of the underlying layer, which is the focus of this article and is explained layer by layer below.

2 ReentrantLock

Let us together to see how to implement its semantic based on AbstractQueueSynchronizer was already.

ReentrantLock uses FairSync and NonfairSync internally, both of which are subclasses of AQS. For example, the main code for FairSync is as follows:

/**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
Copy the code

One of the most important fields in AQS is state, and lock and synchronizer implementations are built around changes to this field. One of the reasons AQS can implement a variety of locks and synchronizers is that different locks or synchronizers can have different definitions of what a synchronization state means according to their needs. Override the corresponding tryAcquire, tryRelease or tryAcquireshared, tryReleaseShared methods to manipulate the synchronization state.

Let’s look at the tryAcquire logic for ReentrantLock’s FairSync:

  1. If state (private volatile int state) is 0, no one is holding the lock. But because it is a fair lock, we need to determine whether we are the first node, and then try to set the state to 1. If it succeeds, we have successfully acquired the lock. CompareAndSetState is also implemented through CAS. CAS is atomic, and state is of type volatile, so the value of state is thread-safe.
  2. If the current thread is not the owner of the lock, then the state is incremented. When the state overflows, an error is thrown. If there is no overflow, return true, indicating that the lock was successfully acquired.
  3. If none of the above conditions are met, false is returned, and the lock failed to be acquired.

At this point, the Implementation of the JAVA level is basically clear, to summarize, the entire framework is as follows:

Unlock.park (), above, blocks and suspends the current thread. Unlock.park (), above, blocks and suspends the current thread.

The second layer JVM

Unsafe. Park and unpark are native methods of the sun.misc.Unsafe class,

public native void unpark(Object var1);

public native void park(boolean var1, long var2);
Copy the code

The realization of these two methods is the hotspot in the JVM/SRC/share/vm/prims/unsafe. The CPP file,

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) UnsafeWrapper("Unsafe_Park"); EventThreadPark event; #ifndef USDT2 HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time); #else /* USDT2 */ HOTSPOT_THREAD_PARK_BEGIN( (uintptr_t) thread->parker(), (int) isAbsolute, time); #endif /* USDT2 */ JavaThreadParkedState jtps(thread, time ! = 0); thread->parker()->park(isAbsolute ! = 0, time); #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker()); #else /* USDT2 */ HOTSPOT_THREAD_PARK_END( (uintptr_t) thread->parker()); #endif /* USDT2 */ if (event.should_commit()) { const oop obj = thread->current_park_blocker(); if (time == 0) { post_thread_park_event(&event, obj, min_jlong, min_jlong); } else { if (isAbsolute ! = 0) { post_thread_park_event(&event, obj, min_jlong, time); } else { post_thread_park_event(&event, obj, time, min_jlong); } } } UNSAFE_ENDCopy the code

The core logic is thread-> Parker ()->park(isAbsolute! = 0, time); Get the Java thread’s Parker object and execute its Park method. Each Java thread has a Parker instance, and the Parker class is defined like this:

class Parker : public os::PlatformParker { private: volatile int _counter ; . public: void park(bool isAbsolute, jlong time); void unpark(); . } class PlatformParker : public CHeapObj<mtInternal> { protected: enum { REL_INDEX = 0, ABS_INDEX = 1 }; int _cur_index; // which cond is in use: -1, 0, 1 pthread_mutex_t _mutex [1] ; pthread_cond_t _cond [2] ; // one for relative times and one for abs. public: // TODO-FIXME: make dtor private ~PlatformParker() { guarantee (0, "invariant") ; } public: PlatformParker() { int status; status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr()); assert_status(status == 0, status, "cond_init rel"); status = pthread_cond_init (&_cond[ABS_INDEX], NULL); assert_status(status == 0, status, "cond_init abs"); status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); _cur_index = -1; // mark as unused } };Copy the code

Park methods:

void Parker::park(bool isAbsolute, jlong time) { // Return immediately if a permit is available. // We depend on Atomic::xchg() having full barrier semantics // since we are doing a lock-free update to _counter. if (Atomic::xchg(0, &_counter) > 0) return; Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; if (Thread::is_interrupted(thread, false)) { return; } // Next, demultiplex/decode time arguments timespec absTime; if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; } if (time > 0) { unpackTime(&absTime, isAbsolute, time); } //// enter safepoint region and change the thread to the blocking state ThreadBlockInVM tbivm(jt); // Don't wait if cannot get lock since interference arises from // unblocking. Also. check interrupt before trying wait if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) ! // If the thread is interrupted, or if an attempt to lock a mutex fails, such as when it is locked by another thread, return; } // pthread_mutex_trylock(_mutex) is successful int status; if (_counter > 0) { // no wait needed _counter = 0; status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; OrderAccess::fence(); return; } #ifdef ASSERT // Don't catch signals while blocked; let the running threads have the signals. // (This allows a debugger to break into the running thread.) sigset_t oldsigs; sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals(); pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs); #endif OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() assert(_cur_index == -1, "invariant"); if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed status = pthread_cond_wait (&_cond[_cur_index], _mutex) ; } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; if (status ! = 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } _cur_index = -1; assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait"); #ifdef ASSERT pthread_sigmask(SIG_SETMASK, &oldsigs, NULL); #endif _counter = 0 ; status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); // If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); }}Copy the code

Park’s idea: Parker has an internal key field _counter, which is used to record the so-called “permit”. When _counter is greater than 0, it means that there is a permit, and then you can set _counter to 0, even if you get the permit, you can continue to run the following code. If _counter is not greater than 0 at this point, wait for this condition to be satisfied.

Let me take a look at the concrete implementation of Park:

  1. When calling park, first try to get permission directly, when _counter>0, then set _counter to 0 and return.
  2. If this fails, the thread’s state is set to _thread_IN_VM and _thread_blocked. _thread_IN_VM indicates that the thread is currently executing in the JVM, and _thread_blocked indicates that the thread is currently blocked.
  3. Once you’ve got the mutex, check again to see if _counter is >0, if so, set _counter to 0, unlock mutex and return
  4. If _counter is still not greater than 0, it checks whether the wait time is equal to 0 and then calls the corresponding pthread_cond_wait series of functions to wait. If the wait returns (i.e., someone unpark, pthread_cond_signal), Set _counter to 0, unlock mutex and return.

So locksupport. park is essentially implemented through the pthread library conditional variable pthread_cond_t. Let’s look at how pthread_cond_t is implemented.

Three GLIBC layer

Typical uses of pthread_cond_t are as follows:

#include < pthread.h> #include < stdio.h> #include < stdlib.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* Initialize the mutex */ pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // Initialize the condition variable void *thread1(void *); void *thread2(void *); int i=1; int main(void) { pthread_t t_a; pthread_t t_b; pthread_create(&t_a,NULL,thread1,(void *)NULL); T_a */ pthread_create(&t_b,NULL,thread2,(void *)NULL); /* Create process t_b*/ pthread_join(t_b, NULL); /* Wait for t_b to finish */ pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cond); exit(0); } void *thread1(void *junk) { for(i=1; i<=9; i++) { pthread_mutex_lock(&mutex); // if(i%3==0) pthread_cond_signal(&cond); T_b */ else printf("thead1:%d/n", I); pthread_mutex_unlock(&mutex); Printf ("Up Unlock Mutex/n"); sleep(1); } } void *thread2(void *junk) { while(i<9) { pthread_mutex_lock(&mutex); if(i%3! =0) pthread_cond_wait(&cond,&mutex); Printf ("thread2:%d/n", I); pthread_mutex_unlock(&mutex); printf("Down Ulock Mutex/n"); sleep(1); }}Copy the code

The point is that both pthread_cond_wait and pthread_cond_signal must be preceded by pthread_mutex_lock. Without this protection, race conditions may occur and signals may be missed. The pthread_cond_wait() function automatically releases mutex as soon as it enters the wait state. When another thread wakes it up with a pthread_cond_signal or a pthread_cond_broadcast so that pthread_cond_wait() returns, the thread automatically obtains the MUTEX.

The whole process is shown below:

1 pthread_mutex_lock

For example, in Linux, a system called Futex(short for Fast User-space mutex) is used.

In this system, atomic increments and tests are performed on mutually exclusive variables in user space.

If the result of the operation shows that there is no contention on the lock, the call to pthread_mutex_lock will return without a context switch to the kernel, so getting the mutex can be very fast.

The system call (called FUtex) occurs only when contention is detected and the context is switched to the kernel, which puts the calling process to sleep until the mutex is released.

There are many more details, especially for reliable and/or priority inheritance mutually exclusive, but that’s the nature of it.

nptl/pthread_mutex_lock.c

int PTHREAD_MUTEX_LOCK (pthread_mutex_t *mutex) { /* See concurrency notes regarding mutex type which is loaded from __kind in struct __pthread_mutex_s in sysdeps/nptl/bits/thread-shared-types.h. */ unsigned int type = PTHREAD_MUTEX_TYPE_ELISION (mutex); LIBC_PROBE (mutex_entry, 1, mutex); if (__builtin_expect (type & ~(PTHREAD_MUTEX_KIND_MASK_NP | PTHREAD_MUTEX_ELISION_FLAGS_NP), 0)) return __pthread_mutex_lock_full (mutex); if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP)) { FORCE_ELISION (mutex, goto elision); simple: /* Normal mutex. */ LLL_MUTEX_LOCK_OPTIMIZED (mutex); assert (mutex->__data.__owner == 0); } #if ENABLE_ELISION_SUPPORT else if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_ELISION_NP)) { elision: __attribute__((unused)) /* This case can never happen on a system without elision, as the mutex type initialization functions will not allow to set the elision flags. */ /* Don't record owner or users for elision case. This is a tail call. */ return LLL_MUTEX_LOCK_ELISION (mutex); } #endif else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex) == PTHREAD_MUTEX_RECURSIVE_NP, 1)) { /* Recursive mutex. */ pid_t id = THREAD_GETMEM (THREAD_SELF, tid); /* Check whether we already hold the mutex. */ if (mutex->__data.__owner == id) { /* Just bump the counter. */ if (__glibc_unlikely (mutex->__data.__count + 1 == 0)) /* Overflow of the counter. */ return EAGAIN; ++mutex->__data.__count; return 0; } /* We have to get the mutex. */ LLL_MUTEX_LOCK_OPTIMIZED (mutex); assert (mutex->__data.__owner == 0); mutex->__data.__count = 1; } else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex) == PTHREAD_MUTEX_ADAPTIVE_NP, 1)) { if (LLL_MUTEX_TRYLOCK (mutex) ! = 0) { int cnt = 0; int max_cnt = MIN (max_adaptive_count (), mutex->__data.__spins * 2 + 10); do { if (cnt++ >= max_cnt) { LLL_MUTEX_LOCK (mutex); break; } atomic_spin_nop (); } while (LLL_MUTEX_TRYLOCK (mutex) ! = 0); mutex->__data.__spins += (cnt - mutex->__data.__spins) / 8; } assert (mutex->__data.__owner == 0); } else { pid_t id = THREAD_GETMEM (THREAD_SELF, tid); assert (PTHREAD_MUTEX_TYPE (mutex) == PTHREAD_MUTEX_ERRORCHECK_NP); /* Check whether we already hold the mutex. */ if (__glibc_unlikely (mutex->__data.__owner == id)) return EDEADLK; goto simple; } pid_t id = THREAD_GETMEM (THREAD_SELF, tid); /* Record the ownership. */ mutex->__data.__owner = id; #ifndef NO_INCR ++mutex->__data.__nusers; #endif LIBC_PROBE (mutex_acquired, 1, mutex); return 0; }Copy the code

Pthread_mutex_t is defined as follows:

typedef union { struct __pthread_mutex_s { int __lock; unsigned int __count; int __owner; unsigned int __nusers; int __kind; int __spins; __pthread_list_t __list; } __data; . } pthread_mutex_t;Copy the code

The __kind field indicates the lock type, and its value is as follows:

/* Mutex types.  */
enum
{ 
  PTHREAD_MUTEX_TIMED_NP,
  PTHREAD_MUTEX_RECURSIVE_NP,
  PTHREAD_MUTEX_ERRORCHECK_NP,
  PTHREAD_MUTEX_ADAPTIVE_NP
#if defined __USE_UNIX98 || defined __USE_XOPEN2K8
  ,
  PTHREAD_MUTEX_NORMAL = PTHREAD_MUTEX_TIMED_NP,
  PTHREAD_MUTEX_RECURSIVE = PTHREAD_MUTEX_RECURSIVE_NP,
  PTHREAD_MUTEX_ERRORCHECK = PTHREAD_MUTEX_ERRORCHECK_NP,
  PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_NORMAL
#endif
#ifdef __USE_GNU
  /* For compatibility.  */
  , PTHREAD_MUTEX_FAST_NP = PTHREAD_MUTEX_TIMED_NP
#endif
};
Copy the code

Among them:

  • PTHREAD_MUTEX_TIMED_NP, which is the default, is a normal lock.
  • PTHREAD_MUTEX_RECURSIVE_NP, a reentrant lock that allows the same thread to successfully acquire the same lock multiple times and unlock it multiple times.
  • PTHREAD_MUTEX_ERRORCHECK_NP, an error-checking lock, returns EDEADLK if the same thread repeatedly requests the same lock, otherwise the same type as PTHREAD_MUTEX_TIMED_NP.
  • PTHREAD_MUTEX_ADAPTIVE_NP, adaptive lock, a mixture of spin lock and normal lock.

Mutex uses PTHREAD_MUTEX_TIMED_NP by default, so it goes LLL_MUTEX_LOCK_OPTIMIZED, which is a macro:

# define LLL_MUTEX_LOCK_OPTIMIZED(mutex) lll_mutex_lock_optimized (mutex)

lll_mutex_lock_optimized (pthread_mutex_t *mutex)
{
  /* The single-threaded optimization is only valid for private
     mutexes.  For process-shared mutexes, the mutex could be in a
     shared mapping, so synchronization with another process is needed
     even without any threads.  If the lock is already marked as
     acquired, POSIX requires that pthread_mutex_lock deadlocks for
     normal mutexes, so skip the optimization in that case as
     well.  */
  int private = PTHREAD_MUTEX_PSHARED (mutex);
  if (private == LLL_PRIVATE && SINGLE_THREAD_P && mutex->__data.__lock == 0)
    mutex->__data.__lock = 1;
  else
    lll_lock (mutex->__data.__lock, private);
}
Copy the code

Not LLL_PRIVATE, so lll_lock, lll_lock is also a macro:

#define lll_lock(futex, private)        \
  __lll_lock (&(futex), private)
Copy the code

Note the presence of FUtex, which is the focus of much of the rest of this article.

#define __lll_lock(futex, private) \ ((void) \ ({ \ int *__futex = (futex); \ if (__glibc_unlikely \ (atomic_compare_and_exchange_bool_acq (__futex, 1, 0))) \ { \ if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \ __lll_lock_wait_private (__futex); \ else \ __lll_lock_wait (__futex, private); \} \}))Copy the code

Atomic_compare_and_exchange_bool_acq attempts to change __futex (that is, mutex->__data.__lock) from 0 to 1 by atomic manipulation. Then call __lll_lock_wait with the following code:

void __lll_lock_wait (int *futex, int private) { if (atomic_load_relaxed (futex) == 2) goto futex; while (atomic_exchange_acquire (futex, 2) ! = 0) { futex: LIBC_PROBE (lll_lock_wait, 1, futex); futex_wait ((unsigned int *) futex, 2, private); /* Wait if *futex == 2. */ } }Copy the code

Pthread defines three lock states of FUtex:

  • 0: indicates that the current lock is idle. The lock can be quickly locked without entering the kernel.
  • 1, indicates that a thread is holding the current lock. If another thread needs to lock at this time, it must mark FUtex as “lock contention” and then suspend the current thread through the futex system call into the kernel.
  • 2, represents lock contention, other threads will or are queuing in the kernel futex system for the lock.

If futex = 2, futex_wait = 2, futex_wait = 2, futex_wait = 2, futex_wait = 2 If it’s not 2, you’re the first person to compete. Set futex to 2, tell the next person to queue up, and then lead by example. Futex_wait essentially calls a FUtex system call. In Section 4, we’ll take a closer look at this system call.

2 pthread_cond_wait

The essence is to go to futex system calls, which I won’t expand in space.

Four kernel layer

Why futex, and what problems does it solve? When was the kernel added?

To put it simply, futex’s solution is to operate entirely in user space without a race, requiring no system calls and only going into the kernel to wait or wake up when a race occurs. Therefore, Futex is a synchronization mechanism combining user mode and kernel mode, which requires the cooperation of the two modes. Futex variables are located in user space instead of kernel objects. Futex code is also divided into user mode and kernel mode. In the case of no competition in user mode, when the competition occurs, the sys_FUtex system call will enter kernel mode for processing.

The user mode part has been explained in the previous section, this section focuses on the implementation of FUtex in the kernel part.

Futex designs three basic data structures: FUTEX_hash_bucket, fuTEX_key, and fuTEX_Q.

struct futex_hash_bucket { atomic_t waiters; spinlock_t lock; struct plist_head chain; } ____cacheline_aligned_in_smp; struct futex_q { struct plist_node list; struct task_struct *task; spinlock_t *lock_ptr; union futex_key key; Struct futex_pi_state *pi_state; struct rt_mutex_waiter *rt_waiter; union futex_key *requeue_pi_key; u32 bitset; }; union futex_key { struct { unsigned long pgoff; struct inode *inode; int offset; } shared; struct { unsigned long address; struct mm_struct *mm; int offset; } private; struct { unsigned long word; void *ptr; int offset; } both; };Copy the code

There’s actually a struct __futex_data, as shown below, this one

static struct {
        struct futex_hash_bucket *queues;
        unsigned long            hashsize;
} __futex_data __read_mostly __aligned(2*sizeof(long));

#define futex_queues   (__futex_data.queues)
#define futex_hashsize (__futex_data.hashsize)
Copy the code

When futex is initialized (futex_init), the hashsize is determined, such as 8192 for a 24-core CPU. We then call alloc_large_system_hash to allocate the array space based on the hashsize and initialize the related fields in the array elements, such as plist_head, lock.

static int __init futex_init(void)
{
        unsigned int futex_shift;
        unsigned long i;

#if CONFIG_BASE_SMALL
        futex_hashsize = 16;
#else
        futex_hashsize = roundup_pow_of_two(256 * num_possible_cpus());
#endif

        futex_queues = alloc_large_system_hash("futex", sizeof(*futex_queues),
                                               futex_hashsize, 0,
                                               futex_hashsize < 256 ? HASH_SMALL : 0,
                                               &futex_shift, NULL,
                                               futex_hashsize, futex_hashsize);
        futex_hashsize = 1UL << futex_shift;

        futex_detect_cmpxchg();

        for (i = 0; i < futex_hashsize; i++) {
                atomic_set(&futex_queues[i].waiters, 0);
                plist_head_init(&futex_queues[i].chain);
                spin_lock_init(&futex_queues[i].lock);
        }

        return 0;
}
Copy the code

The relationship between these data structures is shown below:

With the data structure in mind, the flow is easy to understand. The overall flow of FUTEX_wait is as follows:

static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val, ktime_t *abs_time, u32 bitset) { struct hrtimer_sleeper timeout, *to = NULL; struct restart_block *restart; struct futex_hash_bucket *hb; struct futex_q q = futex_q_init; int ret; if (! bitset) return -EINVAL; q.bitset = bitset; if (abs_time) { to = &timeout; hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ? CLOCK_REALTIME : CLOCK_MONOTONIC, HRTIMER_MODE_ABS); hrtimer_init_sleeper(to, current); hrtimer_set_expires_range_ns(&to->timer, *abs_time, current->timer_slack_ns); } retry: /* * Prepare to wait on uaddr. On success, holds hb lock and increments * q.key refs. */ ret = futex_wait_setup(uaddr, val, flags, &q, &hb); if (ret) goto out; /* queue_me and wait for wakeup, timeout, or a signal. */ futex_wait_queue_me(hb, &q, to); /* If we were woken (and unqueued), we succeeded, whatever. */ ret = 0; /* unqueue_me() drops q.key ref */ if (! unqueue_me(&q)) goto out; ret = -ETIMEDOUT; if (to && ! to->task) goto out; /* * We expect signal_pending(current), but we might be the * victim of a spurious wakeup as well. */ if (! signal_pending(current)) goto retry; ret = -ERESTARTSYS; if (! abs_time) goto out; restart = &current->restart_block; restart->fn = futex_wait_restart; restart->futex.uaddr = uaddr; restart->futex.val = val; restart->futex.time = *abs_time; restart->futex.bitset = bitset; restart->futex.flags = flags | FLAGS_HAS_TIMEOUT; ret = -ERESTART_RESTARTBLOCK; out: if (to) { hrtimer_cancel(&to->timer); destroy_hrtimer_on_stack(&to->timer); } return ret; }Copy the code

The futex_wait_setup function does two main things: hash the Uaddr, find the futex_hash_bucket and get the spin lock on it, and determine if *uaddr is the expected value. If not, return immediately and trylock continues in user mode.

* * futex_wait_setup() - Prepare to wait on a futex * @uaddr: the futex userspace address * @val: the expected value * @flags: futex flags (FLAGS_SHARED, etc.) * @q: the associated futex_q * @hb: storage for hash_bucket pointer to be returned to caller * * Setup the futex_q and locate the hash_bucket. Get the futex  value and * compare it with the expected value. Handle atomic faults internally. * Return with the hb lock held and a q.key reference on success, and unlocked * with no q.key reference on failure. * * Return: * - 0 - uaddr contains val and hb has been locked; * - <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked */ static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags, struct futex_q *q, struct futex_hash_bucket **hb) { u32 uval; int ret; Retry: // Initialize fuTEX_q and set uADDR to the fuTEX_key field. Futex_wake will use this key to look up futex. ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, VERIFY_READ); if (unlikely(ret ! = 0)) return ret; Retry_private: // Computes the hash based on the key and finds the corresponding futex_hash_bucket *hb = queue_lock(q) in the array; Ret = get_fuTEX_value_locked (&uval, uaddr); if (ret) { queue_unlock(*hb); ret = get_user(uval, uaddr); if (ret) goto out; if (! (flags & FLAGS_SHARED)) goto retry_private; put_futex_key(&q->key); goto retry; } // If the value pointed to by uaddr is not equal to val, it indicates that another process has changed the value pointed to by uaddr. if (uval ! = val) { queue_unlock(*hb); ret = -EWOULDBLOCK; } out: if (ret) put_futex_key(&q->key); return ret; }Copy the code

Then call futex_WAIT_queue_me to suspend the current process:

/** * futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal * @hb: the futex hash bucket, must be locked by the caller * @q: the futex_q to queue up on * @timeout: the prepared hrtimer_sleeper, or null for no timeout */ static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q, struct hrtimer_sleeper *timeout) { /* * The task state is guaranteed to be set before another task can * wake it. set_current_state() is implemented using smp_store_mb() and * queue_me() calls spin_unlock() upon completion, both serializing * access to the hash list and forcing another memory barrier. */ set_current_state(TASK_INTERRUPTIBLE);  queue_me(q, hb); /* Arm the timer */ if (timeout) hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS); /* * If we have been removed from the hash list, then another task * has tried to wake us, and we can skip the call to schedule(). */ if (likely(! plist_node_empty(&q->list))) { /* * If the timer has already expired, current will already be * flagged for rescheduling. Only call schedule if there * is no timeout, or if it has yet to expire. */ if (! timeout || timeout->task) freezable_schedule(); } __set_current_state(TASK_RUNNING); }Copy the code

Futex_wait_queue_me does several things:

  1. By inserting the current process into the wait queue, fuTEX_Q is attached to fuTEX_hash_bucket
  2. Starting a Scheduled Task
  3. Actively triggers kernel process scheduling

Five summarizes

This article mainly reviews the ReentrantLock.lock process in JAVA from top to bottom.

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.