This is the 23rd day of my participation in Gwen Challenge

The premise to review

Before writing the relevant principles and part of the source code process of the introduction text,☕ [Java Principles exploration] Strengthen your understanding and foundation of AQS. This article mainly wrote the relevant source analysis and understanding

Most programmers will not direct contact with AbstractQueuedSynchronizer (AQS) class, but the lack of ubiquitous in concurrent tool, and as the internal standard synchronizer, such as already, Semaphore, the Worker in the Java thread pool etc. This article covers the implementation details of AQS.

AbstractQueuedSynchronizer introduction

AQS is responsible for managing the state in the synchronizer class. It manages an integer state information, which can be operated by getState, setState, and compareAndSetState. The meaning of this integer state is given by subclasses, such as in ReentrantLock the number of times the owner thread has repeatedly acquired the lock, or in Semaphore the number of remaining permissions. May have a look to use AbstractQueuedSynchronizer concurrent tools:

AbstractQueuedSynchronizer implementation

AQS definition is simpler, inherited from AbstractOwnableSynchronizer interface:

AbstractOwnableSynchronizer

When a synchronizer to monopoly by a single thread, AbstractOwnableSynchronizer defines the foundation to create lock and related methods of synchronizer, but its itself does not manage to maintain this information, but to a subclass to implement:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
 
    private static final long serialVersionUID = 3737899427754241961L;
 
    protected AbstractOwnableSynchronizer(a) {}/** * the thread currently monopolizing the synchronizer */
    private transient Thread exclusiveOwnerThread;
 
    /** * sets the thread currently exclusive to the synchronizer */
    protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }
 
    /** * gets the thread of the current exclusive synchronizer */
    protected final Thread getExclusiveOwnerThread(a) {
        returnexclusiveOwnerThread; }}Copy the code

Relevant interpretation

For the exclusive lock attribute, store the associated exclusive lock (currently exclusive synchronizer Thread), set the current exclusive lock Thread attribute to the Thread (Thread) reference. The main thing is to set exclusive locks (having thread object references that occupy the current object resources)

AbstractQueuedSynchronizer (CLH locking principle)

AbstractQueuedSynchronizer internal use CLH lock (CLH lock is a kind of based on the list of scalable, high performance and fair spin locks, apply to thread a constant state of polling precursor, if found precursor to release the spin lock ends) variant to implement the thread blocking. The nodes in the CLH lock list are abstracted as nodes:

static final class Node {
    
    /** * indicates that the node is waiting for */ in shared mode
    static final Node SHARED = new Node();
 
    /** * indicates that the node is waiting for */ in exclusive mode 
    static final Node EXCLUSIVE = null;
 
    // ===== The following indicates the wait status of the node =====
    
    /** * indicates that the current thread is cancelled */
    static final int CANCELLED =  1;
 
    /** * indicates that the current node's successor contains threads that need to be run, which is the same state under park. * /
    static final int SIGNAL = -1;
 
    /** * indicates that the current node is waiting on condition. In the condition queue, the status is more in wait, sleep, etc. * /
    static final int CONDITION = -2;
 
    /** * shows that subsequent acquireShared in the current scene can be executed */
    static final int PROPAGATE = -3;
 
    /** * state */
    volatile int waitStatus;
 
    /** * The precursor node, such as when the current node is cancelled, requires the precursor node and its successors to complete the * connection. * /
    volatile Node prev;
 
    /**
     * 后继结点
     */
    volatile Node next;
 
    /** * The current thread when enqueued */
    volatile Thread thread;
 
    /** * Stores successor nodes in the condition queue. * /Node nextWaiter; . }Copy the code

Relevant interpretation

  • Check whether the lock type is shared or exclusive.

  • Identifies the state type of the current thread object node: Cancel represents the state of the node after cancelAcquire has been executed. There are also Signal states, which indicate that the front node is waiting to wake up (essentially, the back node polls for control state), Condition states, which represent other forms of blocking or waiting queuing mechanism, PROPAGATE state, and more relevant migration mechanisms, which allow simultaneous release and wake up of multiple WAITER nodes.

  • And the nextwaiter node mechanism waiting for the next state identifies the post-node in the conidtion state.

  • Contains thread objects, pre – and post-pointers contained in the current node.

Maintenance of AbstractQueuedSynchronizer chain table structure is roughly as follows:

ReentrantLock

To achieve to explore the role of AbstractQueuedSynchronizer from already. ReentrantLock internally encapsulates a Sync class to implement basic lock and UNLOCK operations:

 public class ReentrantLock implements Lock.java.io.Serializable {
   
    // synchronizer, used to implement locking mechanism
    private final Sync sync;
 
    /** * The base synchronizer implementation */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        
        private static final long serialVersionUID = -5179523762034025860L;
 
        /** * is implemented by fair and unfair locks */
        abstract void lock(a);
 
        /** * An unfair lock is attempted */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // Synchronizer status
            int c = getState();
            if (c == 0) {
            	// If the synchronizer is in the initial state, try locking
                if (compareAndSetState(0, acquires)) {
                    // Set the thread occupied by the lock
                    setExclusiveOwnerThread(current);
                    return true; }}else if (current == getExclusiveOwnerThread()) {
            	// If the current thread is already locked, set state to lock reentrant count +1
                int nextc = c + acquires;
                if (nextc < 0) // The maximum number of lock reentrants was exceeded
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
 
        /** * Try to release synchronizer */ 
        protected final boolean tryRelease(int releases) {
            // The new state after release
            int c = getState() - releases;
            if(Thread.currentThread() ! = getExclusiveOwnerThread())// The thread is not occupied
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
            	// State returns to zero
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
 
        /* * Whether the current thread owns the lock */
        protected final boolean isHeldExclusively(a) {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
 
        /* * Create a condition object */
        final ConditionObject newCondition(a) {
            return new ConditionObject();
        }
 
        /* * Gets the current exclusive thread */
        final Thread getOwner(a) {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
 
        /* * Gets the number of times the lock was re-entered */
        final int getHoldCount(a) {
            return isHeldExclusively() ? getState() : 0;
        }
 
        /* * Whether the lock is occupied */
        final boolean isLocked(a) {
            returngetState() ! =0;
        }
 
        /** * deserialize the lock object from the object stream */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            // Reset to the initial state
            setState(0); }}Copy the code

Unfair locking mechanism Sync synchronizer

    /** * unfair lock */
    static final class NonfairSync extends Sync {
        
        private static final long serialVersionUID = 7316153563782823691L;
 
        /** * lock */
        final void lock(a) {
        	// Try locking directly, i.e. preemptively
            if (compareAndSetState(0.1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
            // If you fail, you can queue up to grab the lock
            acquire(1);
        }
 
        /** * Attempts to obtain the lock */
        protected final boolean tryAcquire(int acquires) {
            returnnonfairTryAcquire(acquires); }}Copy the code

Fair locking mechanism Sync synchronizer

    /** * fair lock */
    static final class FairSync extends Sync {
    
        private static final long serialVersionUID = -3000897897090466540L;
 
        final void lock(a) {
           // Queue for lock
            acquire(1);
        }
 
        /** * Attempts to obtain the lock */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
            	// If no other thread is already in the queue, try locking
                if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}else if (current == getExclusiveOwnerThread()) {
            	// Reentrant times + acquires if the current thread already owns the lock
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false; }}Copy the code

Default to unfair lock (excellent performance)

   /** * default unfair lock */
   public ReentrantLock(a) {
       sync = new NonfairSync();
   }

   /** * request lock */
   public void lock(a) {
       sync.lock();
   }

   /** * Attempt to lock, can be interrupted */
   public void lockInterruptibly(a) throws InterruptedException {
       sync.acquireInterruptibly(1);
   }

   /**
    * 尝试加锁
    */
   public boolean tryLock(a) {
       return sync.nonfairTryAcquire(1);
   }

   /** ** lock, with timeout limit */
   public boolean tryLock(long timeout, TimeUnit unit)
           throws InterruptedException {
       return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }

   /** * unlock */
   public void unlock(a) {
       sync.release(1);
   }

   /** * create a condition object */
   public Condition newCondition(a) {
       return sync.newCondition();
   }

   /** * Get the lock reentrant count */
   public int getHoldCount(a) {
       return sync.getHoldCount();
   }

   /** * whether the lock is held by the current thread */
   public boolean isHeldByCurrentThread(a) {
       return sync.isHeldExclusively();
   }

   /** * Whether the lock is already held */
   public boolean isLocked(a) {
       return sync.isLocked();
   }

   /** * whether the lock is fair */
   public final boolean isFair(a) {
       return sync instanceof FairSync;
   }

   /** * get the thread holding the lock */
   protected Thread getOwner(a) {
       return sync.getOwner();
   }

   /** * Whether there is a waiting thread */
   public final boolean hasQueuedThreads(a) {
       return sync.hasQueuedThreads();
   }

   /** * determine whether the thread is in the wait queue */
   public final boolean hasQueuedThread(Thread thread) {
       return sync.isQueued(thread);
   }

   /** * get the length of the wait queue
   public final int getQueueLength(a) {
       return sync.getQueueLength();
   }

   /** * gets the set of waiting threads, not exactly */
   protected Collection<Thread> getQueuedThreads(a) {
       return sync.getQueuedThreads();
   }

   /** * Determines whether there are threads waiting on a condition
   public boolean hasWaiters(Condition condition) {
       if (condition == null)
           throw new NullPointerException();
       if(! (conditioninstanceof AbstractQueuedSynchronizer.ConditionObject))
           throw new IllegalArgumentException("not owner");
       return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
   }

   /** * gets the number of threads waiting on a condition */
   public int getWaitQueueLength(Condition condition) {
       if (condition == null)
           throw new NullPointerException();
       if(! (conditioninstanceof AbstractQueuedSynchronizer.ConditionObject))
           throw new IllegalArgumentException("not owner");
       return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
   }

   /** * gets the set of threads waiting on a condition */
   protected Collection<Thread> getWaitingThreads(Condition condition) {
       if (condition == null)
           throw new NullPointerException();
       if(! (conditioninstanceof AbstractQueuedSynchronizer.ConditionObject))
           throw new IllegalArgumentException("not owner");
       returnsync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition); }}Copy the code

When already perform the lock (), mainly through AbstractQueuedSynchronizer acquire () method:


// ReentrantLock.lock()   
public void lock(a) {
    sync.lock();
}   
 
// FairSync.sync()
final void lock(a) {
    acquire(1);
}
 
/ / acquiring a lock
public final void acquire(int arg) {
    // Try to get the lock:
    // 1. If successful, return directly
    // 2. On failure, enqueue the current thread exclusively and wait for spin to acquireQueued
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// If the wait returns, interrupt yourself
        selfInterrupt();
}
 
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try to go straight to the back of the line,
    // If there is a possibility of failure in concurrency, pass enQ to join the team
    Node pred = tail;
    if(pred ! =null) {
        // If there are threads waiting, try to set Node to tail directly
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            // Link the old tail.next -> node
            pred.next = node;
            returnnode; }}// If compareAndSetTail fails when the wait queue is empty or concurrent, try to continue inserting the wait node
    enq(node);
    return node;
}
 
// The new node joins the queue and returns the old tail node
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
        	// The wait queue is empty. Initialize the head node
            if (compareAndSetHead(new Node()))
            	// Initialize the tail node
                tail = head;
        } else {
        	// The wait queue is not empty
        	// The precursor node to the new node is the tail node
            node.prev = t;
            // Set the new node to the tail node
            if (compareAndSetTail(t, node)) {
            	// The rear-drive node that links the old tail node is the new node
                t.next = node;
                returnt; }}}}// Spin wait
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // Find the head node from the current node
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
            	// The lock has been obtained, and a new head node is set
                setHead(node);
                // Release the wait node. The head node has no next attribute
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // If the lock request fails, check whether the node is blocked by the need
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
    	// If the queue fails, cancel the lock request
        if(failed) cancelAcquire(node); }}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // Get the status of the precursor node
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /* * When the precursor of a node is in SIGNAL state, it indicates that the node has requested to be awakened and can safely block */
        return true;
    if (ws > 0) {
        /* * If the precursor node has been cancelled, ignore these cancelled nodes and continue to find no */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    }else {
        /* * At this point the state of the precursor node is 0 or PROPAGATE(-3), a wake up node signal is needed, but there is no need to block the thread */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
 
private final boolean parkAndCheckInterrupt(a) {
    // Block the current thread
    LockSupport.park(this);
    // Whether the thread is interrupted, and reset the interrupted state
    return Thread.interrupted();
}
Copy the code

explain

  • TryAcquire: Mainly used to customize the implementation of the state, one can implement the related thread blocking and line switching operation

  • AddWaiter: Mainly adds a task node to the counterpoint, including cas setting mechanism and spin setting mechanism

  • AcquireQueued: contains the mechanism for setting the state of the precursor node to perform the task state setting

In addition, there are some whether to block the current task mechanism.

  • ParkAndCheckInterrupt: Performs a self-interrupt mechanism,

Attempt to cancel the request

//
private void cancelAcquire(Node node) {
    // Ignore non-existent nodes
    if (node == null)
        return;
    // The node thread is empty
    node.thread = null;
 
    // Ignore the cancelled node
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
 
    // predNext node, which is the successor of the first non-canceled node before the node
    Node predNext = pred.next;
 
    // Set the node state to Cancel
    node.waitStatus = Node.CANCELLED;
 
    // If the current node is the tail node, set a new tail node
    if (node == tail && compareAndSetTail(node, pred)) {
    	// Empty the successor node of node
        compareAndSetNext(pred, predNext, null);
    } else {
    	// If node is not a tail node, it is a middle node in the list
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
            // If the precursor node of a node is not the head node, then the successor node of the current node needs to be marked "waked up".
            // Set the wait state of the current node's precursor node to SIGNAL, and then set it to the precursor node of the current node's successors
            Node next = node.next;
            if(next ! =null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // Wake up the successor node of node.
            unparkSuccessor(node);
        }
        // Make the next reference to the cancelled node point to itselfnode.next = node; }}Copy the code

The initial state

When only one thread t1 does lock(), since tryAcquire() returns true, there is no wait and the wait queue state remains unchanged. If t1 is not unlock(), t2 performs a lock() operation, and the queue is initialized and t2 is inserted into the queue:

If thread t3 also performs the lock() operation:

ReentrantLock (unlock) ReentrantLock (unlock)

// ReentrantLock.unlock()	
public void unlock(a) {
	sync.release(1);
}
 
// Sync.release()
public final boolean release(int arg) {
	// Attempt to unlock, as defined by subclass ReentrantLock
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            // Wake up the first node
            unparkSuccessor(h);
        return true;
    }
    return false;
}
 
// ReentrantLock.tryRelease()
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
    	// The lock is released successfully only when state = 0, that is, the unified thread must lock() as many times as it unlock()
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
 
// AbstractQueuedSynchronizer.unparkSuccessor()
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // Find a valid successor node
    Node s = node.next;
    // If the successor node does not exist, or the state is cancelled, the preceding non-cancelled node is queried
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
    	// Wake up the corresponding node lock in the thread
        LockSupport.unpark(s.thread);
}
 
/ / once LockSupport. Unpark (s.t hread); After executing, the corresponding waiting node will wake up:

private final boolean parkAndCheckInterrupt(a) {
    // Wake up and return
    LockSupport.park(this);
    return Thread.interrupted();
}
 
final boolean acquireQueued(final Node node, int arg) {...try{...for (;;) {
            // Set up a new head node and wait for the node to execute
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; 
                failed = false;
                returninterrupted; }}}finally{... }}Copy the code

It should be noted that the initial node is released in the first step of waking up the node. If there is cancel or NULL in the post-node, the thread task node that is not the first and is not cancelled or NULL will be woken up by traversing forward the relevant tail node.

The following diagram shows the state change of the internal wait queue when the lock is acquired from the synchronizer:

When only one thread t1 does lock(), since tryAcquire() returns true, there is no wait and the wait queue state remains unchanged. If t1 is not unlock(), t2 performs a lock() operation, and the queue is initialized and t2 is inserted into the queue:

If thread t3 also performs the lock() operation:

Above, is the basic implementation AbstractQueuedSynchronizer synchronizer mechanism, as the basis of a lot of complicated tools, specification for how the blocking and awaken thread, compared with the ordinary locking mechanism (e.g., synchronized), through the spin wait and precise, can improve the performance of some concurrency.