preface

Some time ago in the interview, I found that all the interviewers asked about the synchronizer AQS. AQS provides a basic framework for almost all locks and synchronizers in Java, derivating AQS family buckets such as ReentrantLock, Semaphore, CountDownLatch, and so on. A few emphasis in this paper, based on the principle of AQS, talk about the understanding of AbstractQueuedSynchronizer, and implement a custom synchronizer.

Key answers to AQS principles interview questions

  1. State State maintenance.
  2. CLH queue
  3. ConditionObject notice
  4. Template method design patterns
  5. Exclusive and share mode.
  6. Custom synchronizer.
  7. AQS family bucket some extensions, such as: ReentrantLock, etc.

AQS class diagram structure

AQS full name is AbstractQueuedSynchronizer, namely abstract synchronous queue. See the structure of AQS class diagram below:

To facilitate the understanding of the following key points, we first familiarize ourselves with the structure of the AQS class diagram.

State State maintenance

A single shared state is maintained in the AQS for synchronizer synchronization. Take a look at the code for state:Copy the code

State the source code

  
    /** * The synchronization state. */
  private volatile int state;

  /**
   * Returns the current value of synchronization state.
   * This operation has memory semantics of a {@code volatile} read.
   * @return current state value
   */
  protected final int getState(a) {
      return state;
  }

  /**
   * Sets the value of synchronization state.
   * This operation has memory semantics of a {@code volatile} write.
   * @param newState the new state value
   */
  protected final void setState(int newState) {
      state = newState;
  }

  /**
   * Atomically sets synchronization state to the given updated
   * value if the current state value equals the expected value.
   * This operation has memory semantics of a {@code volatile} read
   * and write.
   *
   * @param expect the expected value
   * @param update the new value
   * @return {@code true} if successful. False return indicates that the actual
   *         value was not equal to the expected value.
   */
  protected final boolean compareAndSetState(int expect, int update) {
      // See below for intrinsics setup to support this
      return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  }
Copy the code

State source code design several key answers:

  • State is volatile to ensure visibility in multiple threads.
  • The getState() and setState() methods use final modifications to restrict subclasses of AQS from overwriting them both.
  • The compareAndSetState () method, which uses the CAS algorithm of optimistic locking, is also final and does not allow subclass overrides.

CLH queue

When it comes to CLH queues, let’s take a look at the AQS schematic based on the above state:

CLH(Craig, Landin, and Hagersten LOCKS) synchronization queues are FIFO bidirectional queues that internally record the first and last queue elements through the head and tail nodes. The queue elements are of type Node. AQS rely on it to complete the synchronization state of the state of management, the current thread if acquiring the synchronization state failure, AQS will wait for the current thread already wait state information structure into a Node (the Node) and add it to the CLH synchronous queue, blocks the current thread at the same time, when the sync release, will wake the first Node (fair), Make it try again to get the synchronization status.

The Node Node

In CLH synchronization queue, a node represents a thread, which stores thread reference (Thread), state (waitStatus), precursor node (PREv) and successor node (Next). The subsequent node (nextWaiter) of condition queue is shown as follows:

WaitStatus Indicates the following status:

Let’s look again at the CLH queue entry and exit code:

loaded

CLH queue enqueueing is tail pointing to the new node, the prev of the new node pointing to the last node, and the next of the last node pointing to the current node. The addWaiter method is as follows:

/ / the Node construction
private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; Backup to full enq on failure
        Node pred = tail;
        if(pred ! =null) {
            node.prev = pred;
            //CAS sets the tail node
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                returnnode; }}// Multiple attempts
        enq(node);
        return node;
        }
Copy the code

If addWaiter fails to set the tail Node, call enq(Node Node) to set the tail Node as follows:

   private Node enq(final Node node) {
        // Try endlessly until you succeed
        for (;;) {
            Node t = tail;
            //tail does not exist. Set the first node
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    returnt; }}}}Copy the code

Break the ranks

When the thread of the primary node releases the synchronization state, it wakes up its successor node (next), which sets itself up as the primary node when the synchronization state is successfully acquired. Take a look at the following two pieces of source code:

  Node h = head;
  if(h ! =null&& h.waitStatus ! =0)
  unparkSuccessor(h);
Copy the code
 private void unparkSuccessor(Node node) {
        /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        if(s ! =null)
            LockSupport.unpark(s.thread);
    }

Copy the code

CLH has several key answers

  • Two-way linked lists enter and exit columns
  • CAS algorithm sets tail node + infinite loop spin.

CAS algorithm, you can take a look at my work in the actual combat CAS algorithm to solve the concurrent problem juejin.cn/post/684490…

ConditionObject

ConditionObject profile

As we all know, synchronized control synchronization can cooperate with Object’s wait(), notify(), and notifyAll() series methods to realize wait/notification mode. And the Lock? It provides the Condition interface, with await(),signal(),signalAll() and other methods to implement the wait/notification mechanism. ConditionObject implements the Condition interface, which supports Condition variables for AQS.

Condition queues and CLH queues

Let’s take a look at the picture below:

ConditionObject and CLH ConditionObject

  • The thread that calls the await() method is added to the conditionObject wait queue and wakes up the next CLH head node in the queue.
  • The firstWaiter is added to the CLH queue of the AQS after the thread calls Singnal () on one of the conditionObjects.
  • When a thread calls unLock() to release the lock, the next node (in this case firtWaiter) of the HEAD node in the CLH queue is woken up.

The difference between:

  • ConditionObject each maintains a separate wait queue. The CLH queue maintained by AQS is a synchronization queue. They are all nodes of the same type.

Exclusive and share mode.

AQS supports two synchronization modes: exclusive and shared.

exclusive

Only one thread holds synchronization state at a time, such as ReentrantLock. Can divide for fair lock and non – fair lock again.

Fair lock: First come first to get the lock according to the order the threads are queued in the queue.

Unfair lock: when a thread wants to acquire a lock, it directly grabs the lock regardless of the queue order, which is unreasonable.

Acquire (int arg); acquire(int arg);

  • Acquire (arg) method
  public final void acquire(long arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
  • AddWaiter method
/ / the Node construction
private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; Backup to full enq on failure
        Node pred = tail;
        if(pred ! =null) {
            node.prev = pred;
            //CAS sets the tail node
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                returnnode; }}// Multiple attempts
        enq(node);
        return node;
        }
Copy the code
  • AcquireQueued (Final Node Node, Long ARg) method
 final boolean acquireQueued(final Node node, long arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}Copy the code
  • SelfInterrupt () method
   static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

Copy the code

With the source code, acquire(int ARg) method flow chart can be obtained as follows:

Shared

Multiple threads can execute simultaneously, such as Semaphore/CountDownLatch are shared products.

AcquireShared (long ARg) ¶ acquireShared(long ARg) ¶

  public final void acquireShared(long arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
Copy the code

If doAcquireShared(int arg) fails to obtain the synchronization state, call doAcquireShared(int arg) spin mode to obtain the synchronization state.

 private void doAcquireShared(long arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    long r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if(failed) cancelAcquire(node); }}Copy the code

AQS template method design pattern

Template method pattern

Template method pattern: Define the skeleton of an algorithm in a method, deferring some steps to subclasses. The template approach allows subclasses to redefine certain steps in an algorithm without changing the structure of the algorithm.

Example in daily life: suppose we want to travel to Beijing, then we can take a high-speed train or plane, or train, then define the abstract class of transportation, can be the following template: buy a ticket -> security check -> take xx transport -> arrive in Beijing. Let subclasses inherit the abstract class and implement the corresponding template method.

Some template methods defined by AQS are as follows:

IsHeldExclusively ()// Whether the thread is monopolizing resources. You only need to implement it if you use condition. TryAcquire (int)// Exclusive. Attempts to obtain the resource return true on success and false on failure. TryRelease (int)// Exclusive. Attempts to free resources return true on success and false on failure. TryAcquireShared (int)// Share mode. Attempt to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources. TryReleaseShared (int)// Share mode. Attempts to free resources return true on success and false on failure.

In short, AQS provides template methods such as tryAcquire and tryAcquireShared to implement custom synchronizers for subclasses.

Custom synchronizer.

ConditionObject (state, CLH, ConditionObject, etc.) you need to decide whether you want to implement an exclusive or shared lock, define the meaning of the atomic variable state, and define an inner class to inherit from the AQS. Override the corresponding template method.

Let’s take a look at an AQS implementation of a non-reentrant exclusive lock demo from The Beauty of Concurrent Programming in Java:

public class NonReentrantLock implements Lock.Serializable{

    // Inner class, custom synchronizer
    static class Sync extends AbstractQueuedSynchronizer {
        // Whether the lock is already held
        public boolean isHeldExclusively(a) {
            return getState() == 1;
        }
        // If state is 0, try to acquire the lock
        public boolean tryAcquire(int arg) {
            assert arg== 1 ;
            // The CAS state is set to ensure atomicity of the operation. The current state is 0
            if(compareAndSetState(0.1)) {// Set the current exclusive thread
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // Try to release the lock and set state to 0
        public boolean tryRelease(int arg) {
            assert arg ==1;
            // If the synchronizer synchronizer state is equal to 0, a monitor illegal state exception is thrown
            if(getState() == 0)
                throw new IllegalMonitorStateException();
            // Set the thread of the exclusive lock to NULL
            setExclusiveOwnerThread(null);
            // Set the synchronization status to 0
            setState(0);
            return true;
        }
        // Return Condition. Each Condition contains a Condition queue
        Condition newCondition(a){
            return newConditionObject(); }}// Create a Sync to do the specific work
    private final Sync sync= new Sync ();

    @Override
    public void lock(a) {
        sync.acquire(1);
    }

    public boolean isLocked(a) {
        return sync.isHeldExclusively();
    }
        @Override
    public void lockInterruptibly(a) throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock(a) {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock(a) {
        sync.release(1);
    }


    @Override
    public Condition newCondition(a) {
        returnsync.newCondition(); }}Copy the code

NonReentrantLockDemoTest:

public class NonReentrantLockDemoTest {

    private static NonReentrantLock nonReentrantLock = new NonReentrantLock();

    public static void main(String[] args) {
        for(int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { nonReentrantLock.lock(); try { System.out.println(Thread.currentThread().getName()); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } finally { nonReentrantLock.unlock(); }}); thread.start(); }}}Copy the code

Running results:

AQS family bucket actual combat

AQS is derived from AQS family buckets such as ReentrantLock, Semaphore, and so on. Take a look at their use cases.

ReentrantLock

Already introduced

  • ReentrantLock is a class that implements the Lock interface for re-locking shared resources.
  • ReentrantLock supports both fair and unfair locking

Already the case

Use ReentrantLock to implement a simple thread-safe list as follows:

Public class ReentrantLockList {// Thread unsafe list private ArrayList<String> array = new ArrayList<>(); // Private volatile ReentrantLock lock = new ReentrantLock(); Public void add(String e){lock.lock(); try { array.add(e); }finally { lock.unlock(); Public void remove(String e){lock.lock(); try { array.remove(e); }finally { lock.unlock(); Public String get(int index){lock.lock(); try {returnarray.get(index); }finally { lock.unlock(); }}}Copy the code

Semaphore

Semaphore introduces

  • Semaphore, also known as a Semaphore, can be used to control the number of concurrent access threads to a resource by coordinating the threads to ensure proper use of the resource.

Semaphore case

Java multithreading has one to the more classic interview questions: ABC three threads in order output, loop 10 times.

public class ABCSemaphore {

    private static Semaphore A = new Semaphore(1);
    private static Semaphore B = new Semaphore(1);
    private static Semaphore C = new Semaphore(1);


    static class ThreadA extends Thread {

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    A.acquire();
                    System.out.print("A");
                    B.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    static class ThreadB extends Thread {

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    B.acquire();
                    System.out.print("B");
                    C.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    static class ThreadC extends Thread {

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    C.acquire();
                    System.out.print("C"); A.release(); } } catch (InterruptedException e) { e.printStackTrace(); }} public static void main(String[] args) throws InterruptedException { Ensure that A is executed first b.acquire (); C.acquire(); new ThreadA().start(); new ThreadB().start(); new ThreadC().start(); }Copy the code

reference

  • The Beauty of Concurrent Programming in Java
  • AQS of J.U.C mp.weixin.qq.com/s/-swOI_4_c…

Personal public account

Welcome everyone to pay attention to, we study together, discuss together.