The author of this article: Coke Coke, the author’s personal home page: Coke Coke personal home page

Easily understand the AQS framework

This article requires the following knowledge: Java, critical sections, semaphores, locks

AQS (AbstractQueuedSynchronizer, abstract queue synchronizer) is a Java reentrant lock is already in the foundation, the realization of the read-write lock and semaphore.

Learning and understanding the AQS framework is of great help in understanding Java locks

Better said than sung, AQS source down 2k+ line, this is the work of people stem?

In order to solve the vicious circle of AQS we do not understand and forget to see, the following will lead you to learn the AQS framework step by step from simple to complex.

With the code in this article and annotations in Chinese, you can visit my Github repository and pull it down

Github storage address: Jirath-Liu

AQS is what

There is a class that Java developers will inevitably know about, called ReentrantLock.

In the early days, ReentrantLock was far more efficient than synchronized, but the gap is now closing.

Has anyone clicked ReentrantLock’s source code to check it out?

What really works inside ReentrantLock is the Sync class, and all of ReentrantLock’s lock-related methods call Sync’s methods to actually implement them.

And Sync the parent class is our leading role – AbstractQueuedSynchronizer

public void lock(a) {
    sync.lock();
}
public void lockInterruptibly(a) throws InterruptedException {
        sync.lockInterruptibly();
}
public boolean tryLock(a) {
        returnsync.tryLock(); }...Copy the code

About ReentrantLock here is not long winded, if you want to know can leave a message, to everyone on the arrangement

What we are interested in now is the Sync implementation class for AQS

What does Sync, or the AQS implementation class, do to achieve locking?

Add lock everybody should know is what concept, signal presumably than everybody also should understand (not clear of first search search

The essence of locking is a semaphore. If a thread occupies a resource, it marks the semaphore and other threads know that the critical section is occupied.

The principle of AQS is actually the semaphore mechanism, and the mechanism of Sync is shown below

This process is implemented by AQS, and Sync writes some core judgments to customize.

Above is the source of Sync, acquire is the method of AQS.

After such a long time, I must have a vague understanding of AQS:

A lightweight framework that implements semaphores, waits, and snatch locks.

AbstractQueuedSynchronizer


Provides a framework for implementing blocking locks and associated synchronizers (semaphores, events, etc.) that rely on first-in, first-out (FIFO) wait queues.

This class is intended to provide a useful basis for most synchronizers that rely on a single atomic int value to represent state.

Subclasses must define protected methods that change this state and define what that state means for getting or releasing this object.

How to use AQS to build your own locks?

We first learn to use AQS, and then explore the principle of AQS, always run first, and then think how to run to save energy

The masters of the AQS framework provide us with four interfaces that we need to implement:

These methods are directly throw an exception: the default UnsupportedOperationException, need a subclass inherits to rewrite.

What do these four methods do?

AQS uses the design pattern of template methods. These four methods are not only written to be used directly, but also called by other methods in the framework. As long as we write these four methods in accordance with the rules, we can get an efficient and lightweight lock from our own hands.

Here’s what these four methods do

// Main exported methods

/** * Try to fetch in exclusive mode. * * This method should query whether the state of the object allows it to be fetched in exclusive mode, and if so, get it. * This method is always called by the thread performing the fetch. If this method reports a failure, the Acquire method can queue the thread (if it is not already queued) until some other thread releases the signal. * * This can be used to implement the method lock.trylock (). * default implementation throw UnsupportedOperationException. * /
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

/** * Attempts to set the state to reflect the publication in exclusive mode. * This method is always called by the thread performing the release. * default implementation throw UnsupportedOperationException. * /
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

/** * Try to get it in shared mode. * This method should query whether the state of the object allows it to be fetched in shared mode, and if so, get the object. * This method is always called by the thread performing the fetch. If this method reports a failure, the Acquire method can queue the thread (if not already queued) until some other thread releases the signal. * default implementation throw UnsupportedOperationException. * /
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

/** * Try to set the state to reflect the publication in shared mode. * This method is always called by the thread performing the release. * default implementation throw UnsupportedOperationException. * /
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

/** * Returns true if only synchronization is maintained relative to the current (calling) thread. * each call to the waiting AbstractQueuedSynchronizer ConditionObject method, this method is called. * wait for the method to call release instead. * default implementation throw UnsupportedOperationException. . * this method only in AbstractQueuedSynchronizer ConditionObject method within internal calls, * so if you don't use condition, you do not need to define. * * /
protected boolean isHeldExclusively(a) {
    throw new UnsupportedOperationException();
}
Copy the code

Of course, the AQS framework provides many methods for subclasses to use. These methods are all template methods, final types

There are roughly these methods:

  1. Acquire an exclusive lock, and various postures to acquire it (timeout, response interrupt, attempt, etc.), named acquire
  2. AcquireShared: acquireShared: acquireShared: acquireShared: acquireShared: acquireShared: acquireShared: acquireShared: acquireShared: acquireShared: acquireShared
  3. Lock release, including exclusive lock release and shared lock release, does not deal with thread contention
  4. Operations on waits
  5. The perception of AQS,
    1. Is there a queue, and is the target thread queuing
    2. Set and get the current thread (in the superclass AbstractOwnableSynchronizer implementation),
    3. CAS sets the semaphore (compareAndSetState, I think it’s more appropriate here), retrieves the semaphore,

In summary, AQS provides users with CAS to obtain locks, modify semaphore, sense AQS internal, lock operation methods

These methods can be overwhelming in a single stack, and you can learn how to use them from ReentrantLock.

How to use AQS in ReentrantLock

ReentrantLock is divided into fair and unfair locks. Fair means that the unfair lock will try to acquire the lock directly, while fair locks will see if they are first in the queue. This means that a thread that releases the lock will have a higher chance of acquiring the lock again than other threads.

Let’s first look at the implementation of fair locks to understand how to use AQS

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;
    
    final boolean tryLock(a) {
        Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0.1)) {
                setExclusiveOwnerThread(current);
                return true;
            }
            // Reentrant operation
        } else if (getExclusiveOwnerThread() == current) {
            if (++c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        }
        return false;
    }

    abstract boolean initialTryLock(a);

    final void lock(a) {
        if(! initialTryLock()) acquire(1);
    }

    final void lockInterruptibly(a) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if(! initialTryLock()) acquireInterruptibly(1);
    }

    final boolean tryLockNanos(long nanos) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return initialTryLock() || tryAcquireNanos(1, nanos);
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if(getExclusiveOwnerThread() ! = Thread.currentThread())throw new IllegalMonitorStateException();
        boolean free = (c == 0);
        if (free)
            setExclusiveOwnerThread(null);
        setState(c);
        return free;
    }

    protected final boolean isHeldExclusively(a) {

        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    final ConditionObject newCondition(a) {
        return new ConditionObject();
    }

    final Thread getOwner(a) {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount(a) {
        return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked(a) {
        returngetState() ! =0;
    }

    private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state}}static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final boolean initialTryLock(a) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if(! hasQueuedThreads() && compareAndSetState(0.1)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
                / / reentrant
            } else if (getExclusiveOwnerThread() == current) {
                if (++c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            }
            return false;
        }

        protected final boolean tryAcquire(int acquires) {
            if (getState() == 0 && !hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false; }}Copy the code

AcquireShare (Condition) acquireShare (Condition) acquireShare (Condition) acquireShare (Condition) acquireShare (Condition) acquireShare (Condition

The method ReentrantLock overrides is as follows

From these methods, we can see what ReentrantLock has done to use AQS

abstract static class Sync extends AbstractQueuedSynchronizer {
   
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
            // Only the owner can release
            if(getExclusiveOwnerThread() ! = Thread.currentThread())throw new IllegalMonitorStateException();
            // If free is 0, the lock is unlocked, the thread flag is released, and the semaphore is released
            boolean free = (c == 0);
            if (free)
                setExclusiveOwnerThread(null);
        	// Since the lock is still in place, there is no need to worry about looting and concurrency
            setState(c);
            return free;
    }

    // Just decide if it's you
    protected final boolean isHeldExclusively(a) {
        returngetExclusiveOwnerThread() == Thread.currentThread(); }}static final class FairSync extends Sync {
		/ / fair
        protected final boolean tryAcquire(int acquires) {
            /** * An attempt to snatch the lock is successful only if these conditions are met, and the order is important * 1. The semaphore is 0 * 2. There are no queued threads in front of it. Attempted using CAS and succeeded */
            if (getState() == 0 && !hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false; }}Copy the code

Next, let’s look at the three methods tryLock, Lock, initialTryLock. These are the entry methods used by ReentrantLock. From these methods we can see how AQS is used

	final boolean tryLock(a) {
        Thread current = Thread.currentThread();
        int c = getState();
        // If c is not 0, it is occupied by someone else
        if (c == 0) {
            // Try to snatch the lock
            if (compareAndSetState(0.1)) {
                // if the cas is successfully modified, the lock is obtained
                setExclusiveOwnerThread(current);
                return true;
            }
            // Reentrant operation
        } else if (getExclusiveOwnerThread() == current) {
            if (++c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        }
        return false;
    }

    abstract boolean initialTryLock(a);

    final void lock(a) {
        // Use CAS to try snatching or reentrant
        if(! initialTryLock())// Get the lock directly
            //1 is the semaphore increment
            acquire(1);
    }

/ / in FairSync
	final boolean initialTryLock(a) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                HasQueuedThreads ()
                if(! hasQueuedThreads() && compareAndSetState(0.1)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
                / / reentrant
            } else if (getExclusiveOwnerThread() == current) {
                if (++c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            }
            return false;
        }
Copy the code

To summarize ReentrantLock’s use of AQS

In AQS, lock occupancy is marked by semaphore. AQS realize the logic of lock contention and queue.

At the end of the day, AQS is still a framework, and we wrote the core lock contention part,

AQS provides a variety of lock contention will use the tool class, we only need to write a lock contention code, AQS will help us retry, blocking operations.

What is the principle of AQS and what does AQS do

Here is the source code model, to help you understand the 2k lines of code, I made a few diagrams, let’s look at the diagram to understand the detailed process.

The whole idea of AQS exclusive lock is:

  1. Lock contention using CAS to modify semaphore
  2. If you don’t get it, you get in line
  3. A node in a queue will block if it detects that the previous node is in Signal state
  4. The first node of the queue will continuously query the lock status to try to compete
  5. Releasing the lock changes its state to 0 (to prevent subsequent nodes from blocking) and wakes up subsequent nodes for contention

The core of AQS is two methods: acquire and release

The core of these two methods is the core of AQS, which may be a little hard to accept, but I recommend looking at this flow chart to get a general idea.

If you have a general idea of these two diagrams, take a look at the following example of three threads of ABC

Very recommended point open source to see

  1. Thread A acquires the lock and enters the execution state
  2. Thread B comes in, fails to snatch, enters the queue, finds the Head is empty, creates a Head (state 0), because the Head state is 0, it is in the first snatch bit, again to snatch
  3. Thread B tries to acquire the lock again and fails, marks Head as Signal and tries again
  4. Thread B fails on the second attempt, checks Head as Signal, and goes to sleep
  5. Thread C enters, finds that the lock grab failed, and wraps itself as a new node behind thread C. Since THREAD C is not the first position, it does not grab the lock, detects that thread B is in the 0 state, but does not acquire the lock itself, and marks B as Signal to try again
  6. When C tries again and finds that he is not the first one, B signals and C goes to sleep.
  7. Thread A releases the lock, thread B wakes up and grabs the lock, replaces the Head with its own, and executes its own process
  8. Thread B releases the lock, thread C wakes up, changes Head to C, and executes its own process

If the above process is clear to you, let’s analyze the methods in AQS step by step

Acquire () source code implementation

First, an attempt is made to acquire the lock, failing which, it is enqueued

// First try to acquire the lock. If the lock succeeds, return directly. If the lock fails, encapsulate the thread and queue it
public final void acquire(int arg) {
    if(! tryAcquire(arg) &&// On failure, the encapsulated thread is enqueued
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
Copy the code

Enter an infinite loop, using CAS to fight for the lock, and each round check to see if it should block

The state of the former may be modified during inspection

/** * Retrieves the queued thread in exclusive uninterrupted mode. * Used for conditional wait methods as well as fetching. * * When a thread completes a task, it does not delete itself, but stays on the Head * while subsequent nodes compete with cas and update the Head */ upon success
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // Get the previous node
            final Node p = node.predecessor();
            // if p==Head is the first in the queue,
            // tryAcquire() : tryAcquire() : tryAcquire()
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // If you want to block, block, set interrupt to true, and the thread will block.
            // The first waiting object will not be awakened until the lock is released.
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        // If an exception occurs, the queue is added, but the exception pops up
        // Delete the created node
        if(failed) cancelAcquire(node); }}Copy the code

Check to see if you should block. Node signals are updated here

Here’s what’s interesting about this method

  1. If the former state is canceled, it continues to visit and update the front node (which causes the GC to reclaim the canceled node), and finally breaks the broken chain (or possibly a node)
  2. If the former is 0 (the default), it changes the former to signal, returns, tries again, and blocks again on the next visit.
/** ** Determine if the current node should be blocked ** Check and update the status of nodes that cannot be retrieved. * Returns true if the thread should block. * This is the primary signal control in all acquisition cycles. * Require pred == node.prev. Other cases: Update the former to signal(another attempt after the update, no lock * will revisit the place and block) * */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /* * The node has a state set that requires release to signal that it can be safely parked */
        return true;
    if (ws > 0) {
        /* * If the former is cancelled, * continues to find and update the former record */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // Finally break the broken chain
        pred.next = node;
    } else {
        /* * waitStatus must be 0 or PROPAGATE. * means we need a signal, but don't block. * The caller will need to retry to ensure that */ cannot be retrieved before blocking
        // Try to modify the status bit signal, the next access will block
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
Copy the code

Release () source code implementation

/** * Release in exclusive mode. * Call the subclass's tryRelease and wake up the next waiting node * if {@link#tryRelease} returns true, by unlocking one or more threads. * This method can be used to implement methods {@link Lock#unlock}.
 */
public final boolean release(int arg) {
    // Try to unlock, call the subclass implementation,
    // Note that when subclasses are written here, the exclusive lock is determined to unlock
    if (tryRelease(arg)) {
        Node h = head;
        // If someone is waiting for the lock, wake up the first node
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code

Wake up the successor node,

Here are two details:

  1. The pass parameter is Head. In this method, if Head is not in the 0 state, it will be updated to the 0 state. After the 0 state is updated, the method will not be accessed by other threads of the method.
  2. The search for available nodes is done from back to front, because if the previous node is null, there is no record of the back node.
/** * Wakes up a node's successor (if one exists). * *@param node the node
 */
private void unparkSuccessor(Node node) {
    /* * If the status is negative (that is, a signal may be needed), try to clear it in anticipation of a signal. * Yes if it fails or waits for the thread to change state. * /
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /* * The release thread remains in the subsequent thread, which is usually just the next node. * However, if cancelled or clearly empty, move backwards from the tail to find the actual uncancelled successor. * /
    Node s = node.next;
    //waitStatus indicates that the thread is cancelled (only cancel is greater than 0)
    if (s == null || s.waitStatus > 0) {
        // If the successor node is empty, or the node has been cancelled, then you need to try to find a suitable node
        s = null;
        // Iterate backwards, updating s (thread to wake up)
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
        // Wake up a thread (the unpark method is to wake up the thread from the blocked state)
        LockSupport.unpark(s.thread);
}
Copy the code

This is the principle of exclusive locks in AQS, which will be broken down next time

Follow bloggers, don’t get lost: Cola Cola cola personal home page