instructions

  1. The JDK version used in this article is OpenJDK-1.8

Locking and threading modifications

  1. The nature of locking? The main purpose is to implement an orderly wake-up operation when accessing a critical resource.
  2. Locking in JavasychronizedLock. insychronizedMainly throughmonitorTo implement it. throughObject.wart/notifyImplement thread to block and wake up; The second is thread-basedLockSupport.park/unpackBlock and wake up.
  3. Thread interrupt problem, how to gracefully interrupt a thread?
  • The Java interrupt mechanism is a cooperative mechanism, which means that interrupts do not terminate a thread directly, but the interrupted thread handles the interrupt itself

  • API usage:

    Interrupt (): Sets the thread’s interrupt bit to true;

    IsInterrupted (): Checks whether the interrupt flag of the current thread is true.

    Thread.interrupted(): Checks whether the current Thread interrupted position is true and clears the interrupt flag, reset to fasle.

  1. Does LockSupport cause thread interrupts? LockSupport does not interrupt threads.

  2. What is CAS? Comparison exchanges, primarily an optimistic lock concept, are underpinned by the Unsafe API.

protected final boolean compareAndSetState(int expect, int update) {    
    // See below for intrinsics setup to support this    
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code

Principle and implementation of AQS

AQS is based on the queue in the queue AbstractQueuedSynchronizer implementation class for the realization of the template method.

  1. The underlying data structure is a bidirectional linked list. Each Node contains the prev and next Pointers, as well as the data data field, which is used as a Thread object.

  2. Node has four states: Canceled, wait, conditional wait, and shared

  3. There are two common implementations: fair locks and unfair locks. How to Withdraw Fair refers to whether the data added at the same time and the data in the queue header can compete fairly for resources.

  4. The state is changed via CAS, with sun.misc.Unsafe’s compareAndSwapInt being called to change the state.

  5. A lock is attempted before the thread is enqueued, and if the lock is not available it blocks the current thread and the thread blocks through locksupport.park ().

  6. When the lock is released, it will go to the node in the queue to get the head of the queue and wake up. The head node of the synchronization queue is the thread that currently obtains the lock.

  7. Here is the core code to get the AQS in the lock

    A. Try locking. If the lock fails, enter the queue and retry.

     // Obtain the lock exclusively, you can ignore the interrupt, at least one call, if the failure will be queued, until success
     public final void acquire(int arg) {
         if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

    B. In the queue, check whether it is the queue head. If it is a lock attempt, if not, change the Node state to wake-up. Change the thread state to blocked after successful state modification.

    final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

    The following is shouldParkAfterFailedAcquire method of Node status maintenance.

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
         int ws = pred.waitStatus; // The state of the preceding node goes to the default value 0 for the first time
         if (ws == Node.SIGNAL)
             return true;
         if (ws > 0) {
             do {
                 / / out of the team
                 node.prev = pred = pred.prev;
             } while (pred.waitStatus > 0);
             pred.next = node;
         } else {
             // For the first time, pred.waitStatus = 0 executes the branch
             // Change the state of the preceding node to SIGNAL, indicating that the pred.next node needs to be woken up (it is ready to block, but not yet blocked, and will only be blocked if it fails to acquire the lock again).
             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
         }
         return false;
     }
    Copy the code

The flow chart of AQS

ASQ characteristics

  1. Blocking wait queue
  2. Shared exclusive
  3. Fair/unfair
  4. reentrant
  5. Allow the interrupt

ReentrantLock

A simple Demo

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockTest {

    static ReentrantLock lock = new ReentrantLock();

    static class T extends Thread {
        @Override
        public void run(a) {
            try {
                System.out.println(Thread.currentThread() + "Start trying to acquire the lock");
                if (lock.tryLock(10, TimeUnit.SECONDS)) {
                    System.out.println(Thread.currentThread() + "Lock obtained successfully");
                    TimeUnit.SECONDS.sleep(5); }}catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(Thread.currentThread() + "Start releasing the lock."); lock.unlock(); }}}public static void main(String[] args) {
        T t1 = new T();
        T t2 = new T();
        T t3 = newT(); t1.start(); t2.start(); t3.start(); }}Copy the code

Locking and unlocking process diagram

Description of locking and unlocking process

  1. In the above program, there are three threads to acquire the lock at the same time, and only one thread can acquire the lock at the same time. The following is the logic for joining the queue to try to acquire the lock:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
  if (nanosTimeout <= 0L)
      return false;
  final long deadline = System.nanoTime() + nanosTimeout;
  final Node node = addWaiter(Node.EXCLUSIVE); / / team
  boolean failed = true;
  try {
      for (;;) {
          final Node p = node.predecessor();
          if (p == head && tryAcquire(arg)) { // If it is the head node and the lock was successfully acquired
              setHead(node);
              p.next = null; // help GC
              failed = false;
              return true;
          }
          // The maximum waiting time for the lock is exceeded
          nanosTimeout = deadline - System.nanoTime();
          if (nanosTimeout <= 0L)
              return false;
          // Failed to get the lock and blocked and the spin wait time was exceeded
          if (shouldParkAfterFailedAcquire(p, node) &&
              nanosTimeout > spinForTimeoutThreshold)
              // Enter the blocking nanosTimeout for the blocking time
              LockSupport.parkNanos(this, nanosTimeout);
          if (Thread.interrupted())
              throw newInterruptedException(); }}finally {
      if(failed) cancelAcquire(node); }}Copy the code
  1. If no thread holds the lock, then CAS attempts to hold the lock. If the current thread holds the lock, state + 1 accumulatesReentrantLockReentrant is supported.
// The logic of unfair locking
// Queue-jumping is the thread that is awakened from the current queue, and the thread that is added to the queue can be executed
Queue-jumping occurs if the currently joined thread obtains the lock before the queue awakens
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // No lock, try to compete
    if (c == 0) {
        if (compareAndSetState(0, acquires)) { // Whether the lock has been obtained
            setExclusiveOwnerThread(current);
            return true; }}// The current thread holds the lock, and the state count is +1
    else if (current == getExclusiveOwnerThread()) { // Check if it is reentrant
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
Copy the code
  1. If a lock is acquired, the current node is set as the head node. And returns true if the lock acquisition fails and the spin time for the lock acquisition exceeds then the current thread is blocked by a callLockSupport.parkNanos(this, nanosTimeout);The implementation. It may be called multiple times in the processshouldParkAfterFailedAcquireMethods.shouldParkAfterFailedAcquireCan be used to modify the current node state, and the linked list of invalid nodes out of the queue
/** if the thread fails to obtain the lock, wait for the new node to block. If the thread fails to obtain the lock, wait for the new node to block. The current thread is blocked@paramPred predecessor node *@paramNode Current node *@return {@code true} if thread should block
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // The state of the preceding node must be 0 for the first time
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            // discard invalid nodes
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // For the first time, pred.waitStatus = 0 executes the branch
        // Change the state of the preceding node to SIGNAL, indicating that the pred.next node needs to be woken up (it is ready to block, but not yet blocked, and will only be blocked if it fails to acquire the lock again).
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
Copy the code
  1. Unlock logic, here is the unlock logic, first to unlock, if the state is changed to 0, then to wake up the queue queuing thread.
/ / unlock
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // Determine if there are threads to wake up
        if(h ! =null&& h.waitStatus ! =0) // The value of waitStatus is 0, which is set to 0 only if there is a successor node, and the successor thread needs to be woken up
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// tryRelease 
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // Determine if the current thread holds the lock
    if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
    	// If state == 0, the current thread is no longer holding the lock
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

// Wake up the queue thread
private void unparkSuccessor(Node node) {
	// Change the current node state to 0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
    	// Wake up the node in the queue
        LockSupport.unpark(s.thread);
}
Copy the code
  1. The current node is awakened by logic, first inshouldParkAfterFailedAcquireMethod, then attempts to lock and returns true if the lock succeeds.