Sharing is the transmission of value, like a like


This article is based on JDK-8U261 source code analysis

1 introduction

Because the threads in the CLH queue, what thread acquires the lock, what thread enters the queue, what thread releases the lock, we have no control over that. So conditional queues provide an active way for threads to block and wake up only when specified conditions are met. A few concepts need to be explained for conditional queues: Conditional queue is another queue in AQS besides CLH queue. Every Condition is created to create a conditional queue, and every call to await method is actually to queue in the conditional queue, and every call to signal method is actually to queue in the conditional queue. Unlike CLH queues, which have multiple node states, conditional queues have only one node state: CONDITION. So if a node on a conditional queue is no longer in CONDITION, it means that the node should be queued. Note that conditional queues can only run in exclusive mode.

Two conditional queues are typically created when a conditional queue is used as a blocking queue: notFull and notEmpty. NotFull indicates that when the conditional queue is full, the PUT method will wait until the queue is full. NotEmpty indicates that when the conditional queue is empty, the take method will wait until the queue has data.

The notfull. signal and notempty. signal methods move nodes from the conditional queue to the CLH queue (one at a time). That is, a node can be moved from the conditional queue to the CLH queue. It also means that there is no lock contention on the conditional queue, and all lock contention occurs on the CLH queue.

Some other differences between conditional queues and CLH queues are as follows:

  • The conditional queue uses the nextWaiter pointer to point to the next node and is a one-way linked list structure, which is different from the bidirectional linked list structure of CLH queue.
  • Conditional queues use firstWaiter and lastWaiter to point to the head and tail Pointers, as opposed to head and tail for CLH queues;
  • The first node in a conditional queue is not a special empty node like a CLH queue;
  • Unlike CLH queues, which use a lot of CAS operations to control concurrency, conditional queues are enqueued only if they have already acquired an exclusive lock resource, so concurrency is not a concern in many places.

The following is the specific source code analysis. Conditional queues take ArrayBlockingQueue as an example:


2 the constructor

 1  /** 2 * ArrayBlockingQueue: 3 */
 4  public ArrayBlockingQueue(int capacity) {
 5    this(capacity, false);
 6}
 7
 8  public ArrayBlockingQueue(int capacity, boolean fair) {
 9    if (capacity <= 0)
10        throw new IllegalArgumentException();
11    // An array of actual data
12    this.items = new Object[capacity];
13    // An exclusive lock is implemented using ReentrantLock (fair means a fair lock or an unfair lock, default is an unfair lock)
14    lock = new ReentrantLock(fair);
15    //notEmpty conditional queue
16    notEmpty = lock.newCondition();
17    //notFull conditional queue
18    notFull = lock.newCondition();
19  }
Copy the code

3 put method

  1  /** 2 * ArrayBlockingQueue: 3 */
  4  public void put(E e) throws InterruptedException {
  5    // Non-null check
  6    checkNotNull(e);
  7    final ReentrantLock lock = this.lock;
  8    /* 9 Obtain the exclusive lock resource and respond to the interrupt mode. The implementation code is similar to the Lock method and acquire method of Semaphore. 10 Because we are analyzing conditional queues, we will not analyze the details of the method
 12    lock.lockInterruptibly();
 13    try {
 14        while (count == items.length)
 15            // If the array is full, a new node is enqueued in notFull and the current thread is blocked
 16            notFull.await();
 17        // Add an array element and wake up notEmpty
 18        enqueue(e);
 19    } finally {
 20        // Release the lock resource
 21        lock.unlock();
 22    }
 23  }
Copy the code

4 await method

If the array is found to be full on put or empty on take, the await method is called to put the current node into the conditional queue:

 1  /** 2 * AbstractQueuedSynchronizer: 3 */
 4  public final void await(a) throws InterruptedException {
 5    // Throw an exception if the current thread is interrupted
 6    if (Thread.interrupted())
 7        throw new InterruptedException();
 8    // Add the current node to the conditional queue
 9    Node node = addConditionWaiter();
10    // Release the previously acquired lock resource, because the thread will block later, so if not, other threads will wait for the thread to wake up
11    int savedState = fullyRelease(node);
12    int interruptMode = 0;
13    // If the current node is not in the CLH queue, it blocks and waits for unpark to wake up
14    while(! isOnSyncQueue(node)) {15        LockSupport.park(this);
16        /* 17 Either the normal signal operation was awakened or the signal was interrupted. But in either case, the current node is inserted at the end of the CLH queue, 18, and exits the loop. So use the while to get around this.) 20 */
21        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
22            break;
23    }
24    // The current node has been inserted into the CLH queue (woken up or interrupted by signal). The lock resource is then acquired in the CLH queue
25    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE)26        / * 27 < < < THROW_IE and see transferAfterCancelledWait REINTERRUPT explanation method > > > 28 analysis before 29 if acquireQueued method returns true, 30 Returning true means that the thread is interrupted again in the acquireQueued method (note that this means that there are two code points to determine if the thread is interrupted: If the thread has not been interrupted before, interruptMode=0. If the thread has not been interrupted before, In this case, line 15 is already properly awakened by calling signal and 34 nodes are already in the CLH queue. InterruptedException 36 */ does not need to be thrown if the interrupt occurs after the signal operation, at line 25, while the resource is being locked
37        interruptMode = REINTERRUPT;
38    If (interruptMode=REINTERRUPT) if (interruptMode=REINTERRUPT) if (interruptMode=REINTERRUPT) if (interruptMode=REINTERRUPT) if (interruptMode=REINTERRUPT) The 41 nextWaiter pointer is clearly null, so in this case you do not need to run the unlinkCancelledWaiters method 42. If interruptMode=THROW_IE, please run the following command: The signal method has not been called to remove the node from the conditional queue. This time you need to call 43 unlinkCancelledWaiters method to eliminate the node (44 in the previous transferAfterCancelledWait method has change the state of the node to initial condition 0). Discard all other nodes that are not CONDITION states as well. Note: if the current node is the last node in the conditional queue 45, it will not be cleaned. Just as well, it will be cleaned up the next time you add a node or call signal. 46 */
47    if(node.nextWaiter ! =null)
48        unlinkCancelledWaiters();
49    // Handle interrupts according to different modes (normal mode does not need to handle)
50    if(interruptMode ! =0)
51        reportInterruptAfterWait(interruptMode);
52  }
Copy the code

5 addConditionWaiter method

Logic for adding a node to a conditional queue:

 1  /** 2 * AbstractQueuedSynchronizer: 3 */
 4  private Node addConditionWaiter(a) {
 5    Node t = lastWaiter;
 6    If the last node in the queue is not in CONDITION, delete all nodes in the queue that are not in CONDITION        
10    if(t ! =null&& t.waitStatus ! = Node.CONDITION) {11        // Delete all nodes that are not in CONDITION state
12        unlinkCancelledWaiters();
13        t = lastWaiter;
14    }
15    // Create a new node of type CONDITION
16    Node node = new Node(Thread.currentThread(), Node.CONDITION);
17    if (t == null)
18        //t is null, which means that the conditional queue is empty
19        firstWaiter = node;
20    else
21        // if t is not null, there is a node in the conditional queue. Add the new node at the end
22        t.nextWaiter = node;
23    // The tail pointer points to the new node. The node is added
24    lastWaiter = node;
25    /* 26 Note that unlike the enq method in the CLH queue, if the insert fails, it spins until the insert succeeds 27 because the exclusive lock has not been released yet
29    return node;
30  }
31
32  /** 33 * Line 12:34 * Removes all nodes in the conditional queue that are not in CONDITION 35 */
36  private void unlinkCancelledWaiters(a) {
37    Node t = firstWaiter;
38    /* 39 In each of the following loops, trail points to the first node of the loop, and the last node of the loop is CONDITION. Trail. NextWaiter = next
43    Node trail = null;
44    while(t ! =null) {
45        Node next = t.nextWaiter;
46        if(t.waitStatus ! = Node.CONDITION) {47            t.nextWaiter = null;
48            if (trail == null)
49                firstWaiter = next;
50            else
51                trail.nextWaiter = next;
52            if (next == null)
53                lastWaiter = trail;
54        } else
55            trail = t;
56        t = next;
57    }
58  }
Copy the code

6 fullyRelease method

Release lock resources, including all lock resources for reentrant locks:

 1  /** 2 * AbstractQueuedSynchronizer: 3 */
 4  final int fullyRelease(Node node) {
 5    boolean failed = true;
 6    try {
 7        int savedState = getState();
 8        /* 9 Release the lock resource. Note that all locks, including reentrant locks, will be released all at once. Because the previous line of code 10 savedState stores all the lock resources, this is to release all these resources, which is the "fully" meaning of the method name 11 */
12        if (release(savedState)) {
13            failed = false;
14            return savedState;
15        } else {
16            /* if the state of 17 fails to be released, the exception is thrown, which means that the state is not released cleanly. Notice that if you throw an exception here it's line 166, 19 */
20            throw new IllegalMonitorStateException();
21        }
22    } finally {
23        /* 24 If the lock fails to be released, the node is CANCELLED. CONDITION (CONDITION) : conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional conditional You have to walk the entire queue to find out. However, each time a new node is added to the conditional queue, it is inserted at the tail. If the lock fails to be released, 27 will cancel the newly added node at the tail of the queue. The previous CONDITION must be at the head of the queue (28) because if a new CONDITION is added to the queue, line 12 of the addConditionWaiter method will remove all non-condition nodes (29). If there is a node in the queue that is not CONDITION, it must be at the end of the queue, so you only need to determine it 30 */
31        if (failed)
32            node.waitStatus = Node.CANCELLED;
33    }
34  }
Copy the code

7 isOnSyncQueue method

Determine whether the node is in the CLH queue to determine whether the signal method completes on wake up. Of course, in the method transferAfterCancelledWait will call to this method:

 1  AbstractQueuedSynchronizer: / * * 2 * 3 * 4 * / to determine whether a node in the CLH the queue
 5  final boolean isOnSyncQueue(Node node) {
 6    /* 7 Returns false 9 */ if the current node is in CONDITION or if the node has no prev pointer (prev only exists on CLH queues)
10    if (node.waitStatus == Node.CONDITION || node.prev == null)
11        return false;
12    // Returns true if the current node has a next pointer (the next pointer is only available on nodes in CLH queues; nodes in conditional queues are nextWaiter)
13    if(node.next ! =null)
14        return true;
15    // If the CLH queue is not fast enough, the CLH queue can only be traversed one by one
16    return findNodeFromTail(node);
17  }
18
19  /** 20 * Traverses to determine whether the current node is in the CLH queue where 21 */
22  private boolean findNodeFromTail(Node node) {
23    Node t = tail;
24    for(; ;) {25        if (t == node)
26            return true;
27        if (t == null)
28            return false;
29        t = t.prev;
30    }
31  }
Copy the code

8 checkInterruptWhileWaiting method

Determine the state to which you are awakened (0 / THROW_IE/REINTERRUPT) :

 1  /** 2 * AbstractQueuedSynchronizer: 3 * Returns 0 if the current thread was not interrupted 4 * Returns THROW_IE if the current thread was not signalled when it was interrupted 5 * Returns REINTERRUPT 6 */ if the current thread was signalled when it was interrupted
 7  private int checkInterruptWhileWaiting(Node node) {
 8    return Thread.interrupted() ?
 9            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
10            0;
11  }
12
13  /** 14 * This method is used to distinguish THROW_IE from REINTERRUPT by determining whether the current thread has a signal. 15 * If signal has occurred, then the current node is no longer in CONDITION and can be found in the CLH queue. 16 * 

17 * THROW_IE: Signal is not called at the time of the thread interrupt. At this point, we put this node on the CLH queue to grab resources, 18 * until the lock resource is grabbed, and then remove this node from both CLH and conditional queues. InterruptedException 19 *

20 * REINTERRUPT: Signal has already been called when the thread interrupt occurs. It does not really matter whether the interrupt occurs or not, 21 * because the node has already been placed in the CLH queue. In the signal method, the node is removed from the conditional queue. 22 * At this point, we put the node into the CLH queue to grab the resource until the lock resource is grabbed. 23 * Interrupts the current thread again. InterruptedException 24 */ is not thrown

25 final boolean transferAfterCancelledWait(Node node) { 26 // Check whether the current node state is CONDITION 27 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 28 /* 29 If the CAS succeeds, the node is in the CONDITION state, which means that interruptMode is THROW_IE 30. CLH enqueue is enqueued and resources are locked 31 */ 32 enq(node); 33 return true; 34 } 35 /* 36 If CAS fails, the node is not in the CONDITION state. If CAS fails, signal is not in the CONDITION state. The transferForSignal method in the signal method, which changes the node state to CONDITION 38 and puts the node into the CLH queue, are not atomic operations, so concurrency may be a problem. That is to say, there may be a point in time when the node status is changed to CONDITION, but 39 is not in the CLH queue. The following code considers this scenario. In this case, it is necessary to allocate the current thread resources, 40 wait for the signal method to add the node to the CLH queue 42 while(! isOnSyncQueue(node))43 Thread.yield(); 44 return false; 45 } Copy the code

9 reportInterruptAfterWait method

Interrupt wake up final handling:

 1  /** 2 * AbstractQueuedSynchronizer: 3 */
 4  private void reportInterruptAfterWait(int interruptMode)
 5        throws InterruptedException {
 6    if (interruptMode == THROW_IE)
 7        // THROW_IE eventually throws InterruptedException
 8        throw new InterruptedException();
 9    else if (interruptMode == REINTERRUPT)
10        // REINTERRUPT simply "interrupts" the current thread (just set the interrupt flag bit to true)
11        selfInterrupt();
12  }
Copy the code

10 the enqueue method

ArrayBlockingQueue queue logic:

 1  /** 2 * ArrayBlockingQueue: 3 */
 4  private void enqueue(E x) {
 5    final Object[] items = this.items;
 6    // Insert data
 7    items[putIndex] = x;
 8    //putIndex records the position of the next insertion. If putIndex is already the last one, reset to 0, which means the data may be overwritten
 9    if (++putIndex == items.length)
10        putIndex = 0;
11    // The number in the current array +1
12    count++;
13    /* 14 if the notEmpty conditional queue is notEmpty, wake up the first node in the notEmpty conditional queue to queue for resources in the CLH queue. The signal method will have no effect because there are no blocked take threads at this point 16 */
17    notEmpty.signal();
18  }
Copy the code

11 signal method

To see if nodes in the conditional queue need to be woken up, and to wake up as needed (to move nodes from the conditional queue to the CLH queue) :

 1  /** 2 * AbstractQueuedSynchronizer: 3 */
 4  public final void signal(a) {
 5    // If the current thread is not the thread at the time of the lock, an exception is thrown
 6    if(! isHeldExclusively())7        throw new IllegalMonitorStateException();
 8    Node first = firstWaiter;
 9    if(first ! =null)
10        // If notEmpty conditional queue has nodes, CLH queue will be notified to queue for resources
11        doSignal(first);
12  }
13
14  private void doSignal(Node first) {
15    do {
16        if ((firstWaiter = first.nextWaiter) == null)
17            // equal to null means the conditional queue is empty by this point in the loop, so set lastWaiter to null as well
18            lastWaiter = null;
19        // Disconnect the nextWaiter pointer of the current node in the notEmpty conditional queue, which is equivalent to excluding the current node and waiting for GC
20        first.nextWaiter = null;
21    } while(! transferForSignal(first) &&22            // If the current node is no longer in CONDITION, the next node is selected and placed in the CLH queue
23(first = firstWaiter) ! =null);
24  }
25
26  /** 27 * Move the notEmpty conditional from the conditional queue to the CLH queue 28 * Line 21:29 */
30  final boolean transferForSignal(Node node) {
31    /* 32 If notEmpty is not in CONDITION, return false. 33 Skip the notEmpty. 34 */
35    if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
36        return false;
37
38    // The state of the node has been changed to 0. Adds it to the end of the CLH queue and returns the previous node
39    Node p = enq(node);
40    int ws = p.waitStatus;
41    /* 42 If the current node is blocked, the last node is SIGNAL. The 43 node in the notEmpty conditional queue is still blocked. Therefore, when moving this node to the CLH queue, we need to change the state of the previous node to SIGNAL 44. If the CAS modification fails, the thread on which this node is located will wake up to compete for the lock resource. The result is that the lock resource is not grabbed (because the lock resource is held by the current thread of 45), so it continues to block in the acquireQueued method, in which the SIGNAL status of the previous node 46 is corrected again (it must be successfully modified, if not successfully modified). The CAS will be iterated through the acquireQueued method (47 Of course, wake up the node if the previous one was CANCELLED). The acquireQueued method has the opportunity to discard 48 CANCELLED nodes, which is equivalent to a clean up. 49 It should be said that this method wakes up the blocked take thread (the array was empty until 50 was added) So you need to wake up a fetch thread that was blocked. Let's assume that the awakened thread is thread 2, and thread 1 performs wakeup action 51. As mentioned earlier, thread 2 will enter the acquireQueued method and block again. Until thread 1 moves to the last step of 52 in the put method, which is unlock, it will be woken up again (not necessarily this time, but maybe another thread (let's say 53 is thread 3). But as long as thread 3 finally performs the unlock method, it will continue to wake up, passing on the wake action to thread 54, and thread 2 will eventually have a chance to wake up (when it becomes the first node in the CLH queue). 55 */
56    if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL))57        LockSupport.unpark(node.thread);
58    return true;
59  }
Copy the code

12 take method

ArrayBlockingQueue take method:

 1  /** 2 * ArrayBlockingQueue: 3 */
 4  public E take(a) throws InterruptedException {
 5    final ReentrantLock lock = this.lock;
 6    // Lock in response to interrupt mode
 7    lock.lockInterruptibly();
 8    try {
 9        while (count == 0)
10            // If the array is empty, enqueue a new node in notEmpty and block the current thread
11            notEmpty.await();
12        // Delete array elements and wake up notFull
13        return dequeue();
14    } finally {
15        / / unlock
16        lock.unlock();
17    }
18  }
19
20  /** 21 * at line 13:22 */
23  private E dequeue(a) {
24    final Object[] items = this.items;
25    // Record the old value and finally return it
26    @SuppressWarnings("unchecked")
27    E x = (E) items[takeIndex];
28    // Empty the array elements
29    items[takeIndex] = null;
30    //takeIndex records the location of the next fetch. If takeIndex is already the last one, reset to 0
31    if (++takeIndex == items.length)
32        takeIndex = 0;
33    // The number in the current array is -1
34    count--;
35    // The elementDequeued method is called when data is removed from the array to ensure consistency between Itrs iterators and queue data
36    if(itrs ! =null)
37        itrs.elementDequeued();
38    /* 39 If the notFull condition queue is not empty, wake up the first node in the notFull condition queue to queue up resources in the CLH queue. The signal method will have no effect because there are no blocked PUT threads at this point 41 */
42    notFull.signal();
43    return x;
44  }
Copy the code

The longer I work in this industry, the more I feel: great oaks from little acorns grow, it’s a truth! In the application business for too long many bottom things are often easy to ignore, the plan at the beginning of this year is to do a summary of the commonly used JDK source tools, see the end of the year is approaching, by the recent free, hurriedly to make up.

  1. Do you know ArrayList? Describe the difference between remove for foreach and iterator
  2. Have you ever wondered why Internet companies always ask about collections? Let’s talk about the classic data structure HashMap
  3. AQS source in-depth analysis of exclusive mode -ReentrantLock lock features in detail
  4. AQS source code for in-depth analysis of sharing mode – Why PROPAGATE state in AQS?
  5. AQS source code in depth analysis of conditional queue -Java blocking queue is how to achieve? (Current article)
  6. AQS source code in-depth analysis of the application tool CountDownLatch (creation)
  7. CyclicBarrier: AQS source code analysis tool
  8. ConcurrentHashMap is a bug in Java 8. And there’s more than one! This pit is still relatively large, will focus on the summary behind! (Finished)
  9. ThreadPoolExecutor source code analysis – The implementation process of the Java thread pool is broken, and many people are still confused. (Finished)
  10. ScheduledThreadPoolExecutor source analysis – often timer thread pool focus on how to achieve delay the execution and cycle!
  11. ThreadLocal source code analysis – key summary, memory leak, soft reference weak reference false reference, interview often like to ask, I also like to ask another
  12. Red black tree TreeMap, LinkedHashMap
  13. An in-depth understanding of the ordered and threaded Map container ConcurrentSkipListMap
  14. LinkedList (Not sure if you want to write it, if you have time, it depends on the project)
  15. 1T data quicksort! Summary of ten classical sorting algorithms

Each summary is the knowledge of the degree of examination, technology is not easy, every day a little better, with everyone.

In addition, the author’s public account: Geek time, there are more wonderful articles, interested students, you can pay attention to