What is the AQS

  • AbstractQueuedSynchronizer synchronizer is a queue, is used to construct the basic framework of sync locks and other components, it USES a volatile modified int member variable synchronous state, through the built-in FIFO queue to finish the work of resource acquisition thread queue
  • The int member variable state is changed to indicate whether the lock is successfully obtained. When state>0, the lock is successfully obtained, and when state=0, the lock is successfully released. Three methods are provided (getState(),setState(int newState),compareAndSetState(int expect,int update)AQS ensures thread-safe operation on state.
  • The main use is inheritance. Subclasses manage synchronization state by inheriting the synchronizer and implementing its abstraction methods.
  • Both exclusive and shared modes are provided to manipulate the acquisition and release of synchronous state
  • Concurrency tools such as ReentrantLock, ReentrantReadWriteLock, and Semaphore are built on the basis of integrating AQS with an internal helper class

Methods provided by AQS (list the main ones)

  • acquire(int arg)Gets objects in exclusive mode, ignoring interrupts.
  • acquireInterruptibly(int arg)Retrieves objects in exclusive mode and aborts if interrupted.
  • acquire(int arg)Gets objects in exclusive mode, ignoring interrupts.
  • acquireShared(int arg)Get objects in shared mode, ignoring interrupts.
  • acquireSharedInterruptibly(int arg)Gets objects in shared mode and aborts if interrupted.
  • compareAndSetState(int expect, int update)If the current state value is equal to the expected value, the synchronization state is set atomically to the given update value.
  • getState()Returns the current value of the synchronization status.
  • release(int arg)Release objects in exclusive mode.
  • releaseShared(int arg)Release objects in shared mode.
  • setState(int newState)Sets the value of synchronization status.
  • tryAcquire(int arg)An attempt was made to obtain the state of an object in exclusive mode.
  • tryAcquireNanos(int arg, long nanosTimeout)Attempts to obtain objects in exclusive mode are aborted if interrupted, and fail if a given timeout period is reached.
  • tryAcquireShared(int arg)An attempt was made to obtain object state in shared mode.
  • tryAcquireSharedNanos(int arg, long nanosTimeout)Attempts to retrieve objects in shared mode are aborted if interrupted, and fail if a given timeout period is reached.
  • tryReleaseShared(int arg)An attempt was made to set the state to reflect a release in shared mode.

Implementation analysis of queue synchronizer

  • Synchronizer rely on internal synchronous queue (two-way a FIFO queue) to complete synchronization state management, the current thread for synchronous state failure, synchronizer will the current thread and wait state information such as the structure become a Node (the Node) and add it to the synchronous queue, blocks the current thread at the same time, when the sync release, Wakes up the thread in the first node to try again to get the synchronization state.
 static final class Node {
        /** Indicates that the node is waiting for */ in shared mode
        static final Node SHARED = new Node();
        /** Indicates that the node is waiting for */ in exclusive mode
        static final Node EXCLUSIVE = null;

        /** Indicates that the thread waiting in the synchronization queue has timed out or is interrupted. The thread waiting in the synchronization queue needs to be cancelled from the synchronization queue. The value does not change when the node enters the value */
        static final int CANCELLED =  1;
        /** Threads on subsequent nodes are in wait state, and threads on the current node will notify subsequent nodes to run */ if they release synchronization or cancel
        static final int SIGNAL    = -1;
        /** The node is waiting, and the node thread is waiting on Conditions. When another thread calls signal() on Condition, the node is moved from the wait queue to the synchronization queue and added to the synchronization state fetch */
        static final int CONDITION = -2;
        /** * indicates that the next shared synchronization state acquisition will be propagated unconditionally */
        static final int PROPAGATE = -3;

        volatile int waitStatus;

       /** The precursor node **/
        volatile Node prev;
        /** The successor node **/
        volatile Node next;

  
        volatile Thread thread;


        Node nextWaiter;

        final boolean isShared(a) {
            return nextWaiter == SHARED;
        }

        final Node predecessor(a) throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread; }}Copy the code

The node is the basis of the synchronization queue. The synchronizer has the head node and the tail node. The thread that failed to obtain the synchronization status will become the tail of the node to join the queue

Acquire method for synchronizer

  public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
  • Call the tryAcquire(int arg) method of the custom synchronizer implementation, which ensures thread-safe acquisition of the synchronization state, and if the synchronization state fails, construct a synchronization Node and add it to the end of the synchronization queue using the addWaiter(Node Node) method. Finally, the acquireQueued(Node Node,int arg) method is called, causing the Node to obtain the synchronization state in an “infinite loop “(spin).

Summary of exclusive get and release

  • When obtaining the synchronization state, the synchronizer maintains a synchronization queue. The thread that fails to obtain the synchronization state will be added to the queue and spin in the queue. The condition for moving out of the queue (or stopping the spin) is that the precursor node is the head node and synchronization status has been successfully acquired. When releasing the synchronization state, the synchronizer calls the tryRelease(int arg) method to release the synchronization state and then wakes up the successor nodes of the head node

Synchronizer release method (release)

  public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
Copy the code
  • The method, when executed, wakes up the successor Node threads of the head Node, and the unparksucceeded (Node Node) method uses LockSupport to wake up the threads in the waiting state.

Based on AQS to achieve a simple reentrant exclusive lock acquisition and release

package com.example.juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/** * Implement a simple lock based on AQS **@author qinxuewu
 * @create19/3/18 11:44 PM *@since1.0.0 * /
public class MyAQSLock implements Lock {
    private  final  MySync sync=new MySync();

    /** * Build an internal helper to integrate AQS */
    private  static  class MySync extends AbstractQueuedSynchronizer{
        // Get the lock when the state is 0.

        /*** * when a thread enters, if the state is 0, change the state variable and return true to acquire the lock ** When state is greater than 0, the current lock is already held, return false, if repeated, add state, return true *@param arg
         * @return* /
        @Override
        protected boolean tryAcquire(int arg) {
            // Get the value of the member variable of the synchronization state
            int state=getState();
            Thread cru=Thread.currentThread();
            if(state==0) {//CAS updates state to ensure atomicity, expected value, updated value
                if( compareAndSetState(0,arg)){
                    // The setting succeeded
                    // Set the current thread
                    setExclusiveOwnerThread(Thread.currentThread());
                    return  true; }}else if(Thread.currentThread()==getExclusiveOwnerThread()){
                    // If the current thread comes in, add state and return true
                    setState(state+1);
                    return  true;
            }
            return false;
        }

        /** * Release synchronization status *@param arg
         * @return* /
        @Override
        protected boolean tryRelease(int arg) {
            boolean flag=false;
            // Determine if the release operation is the current thread,
            if(Thread.currentThread()==getExclusiveOwnerThread()){

                    // Get the synchronization status member variable, if greater than 0
                    int state=getState();
                    if(getState()==0) {// Set the current thread to null
                        setExclusiveOwnerThread(null);
                        flag=true;
                    }
                    setState(arg);

            }else{
                // Not when the thread throws an exception
                throw  new RuntimeException();
            }
            return flag;
        }
        Condition newCondition(a){
            return  newConditionObject(); }}@Override
    public void lock(a) {
            sync.acquire(1);
    }

    @Override
    public void lockInterruptibly(a) throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /** * lock *@return* /
    @Override
    public boolean tryLock(a) {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    /** * release lock */
    @Override
    public void unlock(a) {
        sync.tryRelease(1);
    }

    @Override
    public Condition newCondition(a) {
        returnsync.newCondition(); }}Copy the code

test

public class MyAQSLockTest {
    MyAQSLock lock=new MyAQSLock();
    private    int i=0;
    public  int  next(a) {
        try {
            lock.lock();
            try {
                Thread.sleep(300);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return i++;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return 0;
    }

    public void test1(a){
        System.out.println("test1");
        test2();
    }
    public  void  test2(a){
        System.out.println("test2");
    }

    public static void main(String[] args){
        MyAQSLockTest test=new MyAQSLockTest();
// Thread thread = new Thread(new Runnable() {
// @Override
// public void run() {
// while (true) {
//
// System.out.println(Thread.currentThread().getName() + "-" + test.next());
//
/ /}
//
/ /}
/ /});
// thread.start();
//
// Thread thread2 = new Thread(new Runnable() {
// @Override
// public void run() {
// while (true) {
//
// System.out.println(Thread.currentThread().getName() + "-" + test.next());
//
/ /}
//
/ /}
/ /});
// thread2.start();

        // repeat lock demo
        Thread thread3 = new Thread(new Runnable() {
            @Override
            public void run(a) { test.test1(); }}); thread3.start(); }}Copy the code
  • See the art of concurrent programming
  • JDK8 source
  • My blog: blog.qinxuewu.club/
  • Github: github.com/a870439570