Back to the home page

Based on the analysis of Java and contract awarding Java. Util. Concurrent source, without need to know about AbstractQueuedSynchronizer (hereinafter abbreviated AQS) this abstract class, because it is the foundation of Java and contracting out the tools, Is the basis for implementing classes such as ReentrantLock, CountDownLatch, Semaphore, FutureTask, etc.

Google AbstractQueuedSynchronizer, we can find a lot of AQS introduction, but many are not presented clearly, because most of the articles did not clarify some of the key details.

This article will from already fair lock source code, the analysis of AbstractQueuedSynchronizer how this class works, hoping to provide some simple help for you.

State the following:

  1. This article is a bit long, but very simple, very simple, and is aimed at beginners in concurrent programming, or developers who want to read Java and distribute the source code.
  2. It is recommended to read it on a computer. If you want to understand all the details well and you have never read the analysis, you may need at least 20 minutes to go through all the descriptions. The last third or more of this article is easy, the first quarter is even easier, and the middle section should be better read.
  3. If you don’t know why you’re looking at this, I’d like to tell you that even if you understand all the details, you probably can’t write your business code any better
  4. Source code environment JDK1.7, see do not understand or have doubts about the part, it is best to open the source code to see. Doug Lea’s code is really good.
  5. There are many English notes I did not delete, so that readers can refer to the English to say, in case I fooled it
  6. This article does not analyze the shared mode, so that the reader can reduce a lot of burden, as long as the exclusive mode is understood, the shared mode readers should be able to follow the code. And the condition part is not analyzed, so it is easy to read.
  7. In this paper, the concept of ReentrantLock, which we usually use most, is not correct in nature. Readers should be clear that AQS is not only used to realize the lock, but also hope that readers can associate the use of AQS with the lock to reduce the reading pressure of readers
  8. ReentrantLock’s fair and unfair locks are only slightly different without any reading pressure
  9. You need to know ahead of time what a CAS is

End of bullshit, start.

AQS structure

Let’s take a look at what attributes AQS have, figure out these basic know what routine AQS is, after all, you can guess!

Private TRANSIENT volatile Node head; private transient volatile Node head; Private transient volatile Node tail; private transient volatile Node tail; // This is the most important, but also the simplest, is the current lock state, 0 is not occupied, greater than 0 is holding the current lock thread // We say greater than 0 is not equal to 1 because the lock can be reentrant. // represents the thread currently holding the exclusive lock. For the most important use example, since the lock can be reentrantLock.lock() can be nested multiple times, If (currentThread == getExclusiveOwnerThread()) {state++} private transient Thread exclusiveOwnerThread; / / since the AbstractOwnableSynchronizer inheritanceCopy the code

Well, it looks like it should be easy, because there are only four attributes.

AbstractQueuedSynchronizer waiting queue beckoned as shown below, pay attention to, after the analysis process in the queue, which is blocking queue does not contain the head, does not contain a head, did not include the head.

Each thread in the queue is wrapped as a node, the data structure is linked list, let’s look at the source code:

Static final class Node {/** Marker to indicate a Node is waiting in shared mode */ // Indicates that the Node is in shared mode SHARED = new Node(); /** indicate a node is waiting in exclusive mode */ / static final node EXCLUSIVE = null; =========== /** waitStatus value to indicate thread has been cancelled */ / Static final int CANCELLED = 1; /** waitStatus value to indicate the thread needs unparking */ / Static final int SIGNAL = -1 static final int SIGNAL = -1 /** indicate thread is waiting on condition */ The next article will cover static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should */ / The same unconditionally Static final int PROPAGATE = PROPAGATE; / / = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = / / values for the above 1, 1, 2, 3, or 0 (talk about later) / / so understand, If the value is greater than 0, the thread has cancelled the wait. // ReentrantLock can be specified as timeouot. volatile int waitStatus; // References to the precursor Node volatile Node prev; // References to subsequent nodes volatile Node next; // Volatile Thread Thread; }Copy the code

The data structure of Node is simple: Thread + waitStatus + Pre + Next.

These are the basics, and we’re going to use them many times, so keep them in mind, just keep this structure in mind. Next, let’s talk about fair locking for ReentrantLock. Again, the blocking queue I’m talking about does not contain the head node.

First, let’s look at how ReentrantLock is used.

Public class OrderService {// Use static, so that each thread has the same lock, of course, spring MVC service default singleton, Private static ReentrantLock ReentrantLock = new ReentrantLock(true); Public void createOrder() {// ReentrantLock. lock(); // Usually, the lock is followed by the try statement try {// only one thread can come in at a time (the thread that acquired the lock), // other threads block on the lock() method, waiting for the lock to come in, and then execute the code... // Execute code... // Execute code... } finally {// unlock reentrantlock. unlock(); }}}Copy the code

ReentrantLock internally uses the internal Sync class to manage locks, so the actual lock acquisition and lock release are controlled by Sync’s implementation class.

abstract static class Sync extends AbstractQueuedSynchronizer {

}
Copy the code

Sync has two implementations, NonfairSync and FairSync. Let’s look at the FairSync section.

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
Copy the code

Thread rob lock

A lot of people must start to dislike the above nonsense too much, the following follow the code, I will not nonsense.

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
      // 争锁
    final void lock() {
        acquire(1);
    }
      // 来自父类AQS,我直接贴过来这边,下面分析的时候同样会这样做,不会给读者带来阅读压力
    // 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。
    // 否则,acquireQueued方法会将线程压到队列中
    public final void acquire(int arg) { // 此时 arg == 1
        // 首先调用tryAcquire(1)一下,名字上就知道,这个只是试一试
        // 因为有可能直接就成功了呢,也就不需要进队列排队了,
        // 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的)
        if (!tryAcquire(arg) &&
            // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
              selfInterrupt();
        }
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    // 尝试直接获取锁,返回值是boolean,代表是否获取到锁
    // 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // state == 0 此时此刻没有线程持有锁
        if (c == 0) {
            // 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
            // 看看有没有别人在队列中等了半天了
            if (!hasQueuedPredecessors() &&
                // 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
                // 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
                // 因为刚刚还没人的,我判断过了???
                compareAndSetState(0, acquires)) {

                // 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁
                setExclusiveOwnerThread(current);
                return true;
            }
        }
          // 会进入这个else if分支,说明是重入了,需要操作:state=state+1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // 如果到这里,说明前面的if和else if都没有返回true,说明没有获取到锁
        // 回到上面一个外层调用方法继续看:
        // if (!tryAcquire(arg) 
        //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
        //     selfInterrupt();
        return false;
    }

    // 假设tryAcquire(arg) 返回false,那么代码将执行:
      //        acquireQueued(addWaiter(Node.EXCLUSIVE), arg),
    // 这个方法,首先需要执行:addWaiter(Node.EXCLUSIVE)

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    // 此方法的作用是把线程包装成node,同时进入到队列中
    // 参数mode此时是Node.EXCLUSIVE,代表独占模式
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后
        Node pred = tail;

        // tail!=null => 队列不为空(tail==head的时候,其实队列是空的,不过不管这个吧)
        if (pred != null) { 
            // 设置自己的前驱 为当前的队尾节点
            node.prev = pred; 
            // 用CAS把自己设置为队尾, 如果成功后,tail == node了
            if (compareAndSetTail(pred, node)) { 
                // 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连,
                // 上面已经有 node.prev = pred
                // 加上下面这句,也就实现了和之前的尾节点双向连接了
                pred.next = node;
                // 线程入队了,可以返回了
                return node;
            }
        }
        // 仔细看看上面的代码,如果会到这里,
        // 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
        // 读者一定要跟上思路,如果没有跟上,建议先不要往下读了,往回仔细看,否则会浪费时间的
        enq(node);
        return node;
    }

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    // 采用自旋的方式入队
    // 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队,
    // 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 之前说过,队列为空也会进来这里
            if (t == null) { // Must initialize
                // 初始化head节点
                // 细心的读者会知道原来head和tail初始化的时候都是null,反正我不细心
                // 还是一步CAS,你懂的,现在可能是很多线程同时进来呢
                if (compareAndSetHead(new Node()))
                    // 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了

                    // 这个时候有了head,但是tail还是null,设置一下,
                    // 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了
                    // 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return
                    // 所以,设置完了以后,继续for循环,下次就到下面的else分支了
                    tail = head;
            } else {
                // 下面几行,和上一个方法 addWaiter 是一样的,
                // 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


    // 现在,又回到这段代码了
    // if (!tryAcquire(arg) 
    //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
    //     selfInterrupt();

    // 下面这个方法,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列
    // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,
    // 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false
    // 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
                // 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列
                // 所以当前节点可以去试抢一下锁
                // 这里我们说一下,为什么可以去试试:
                // 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,
                // enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
                // 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,
                // tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,
                // 要么就是tryAcquire(arg)没有抢赢别人,继续往下看
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    // 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"
    // 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;

        // 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。这里需要知道这点:
        // 进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
        // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
        // 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
        // 找前驱节点的前驱节点做爹,往前循环总能找到一个好爹的
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 仔细想想,如果进入到这个分支意味着什么
            // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
            // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
            // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    // private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
    // 这个方法结束根据返回值我们简单分析下:
    // 如果返回true, 说明前驱节点的waitStatus==-1,是正常情况,那么当前线程需要被挂起,等待以后被唤醒
    //        我们也说过,以后是被前驱节点唤醒,就等着前驱节点拿到锁,然后释放锁的时候叫你好了
    // 如果返回false, 说明当前不需要被挂起,为什么呢?往后看

    // 跳回到前面是这个方法
    // if (shouldParkAfterFailedAcquire(p, node) &&
    //                parkAndCheckInterrupt())
    //                interrupted = true;

    // 1. 如果shouldParkAfterFailedAcquire(p, node)返回true,
    // 那么需要执行parkAndCheckInterrupt():

    // 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
    // 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒=======
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

    // 2. 接下来说说如果shouldParkAfterFailedAcquire(p, node)返回false的情况

   // 仔细看shouldParkAfterFailedAcquire(p, node),我们可以发现,其实第一次进来的时候,一般都不会返回true的,原因很简单,前驱节点的waitStatus=-1是依赖于后继节点设置的。也就是说,我都还没给前驱设置-1呢,怎么可能是true呢,但是要看到,这个方法是套在循环里的,所以第二次进来的时候状态就是-1了。

    // 解释下为什么shouldParkAfterFailedAcquire(p, node)返回false的时候不直接挂起线程:
    // => 是为了应对在经过这个方法后,node已经是head的直接后继节点了。剩下的读者自己想想吧。
}
Copy the code

Final Boolean acquireQueued(final Node Node, int arg) acquireQueued(final Node Node, int arg) Figure out for yourself how each branch goes, what happens in which case, and where it goes.

Unlock operation

Finally, we need to introduce the action of awakening. We know that normally, if the thread does not acquire the lock, the thread will be locksupport-park (this); Suspend to stop and wait to be awakened.

Public void unlock() {sync.release(1); public void unlock() {sync.release(1); } public final Boolean release(int arg) {if (tryRelease(arg)) {Node h = head; if (h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; } // Go back to ReentrantLock and look at the tryRelease method protected Final Boolean tryRelease(int releases) {int c = getState() -releases; if (Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); Boolean free = false; If (c ==0) {free = true; if (c ==0) {free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /** * Wakes up node's antecedents, if one exists. ** @param node the node */ / Know from the preceding call, Private void unparksucceeded (node node) {/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; If (ws <0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled nodes. */ / S = node.next; s= node.next; s= node.next; if (s == null || s.waitStatus > 0) { s = null; For (Node t = tail; waitStatus==1; waitStatus==1; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s ! = null) // Wake up thread locksupport. unpark(s.read); }Copy the code

After waking up the thread, the awakened thread will proceed from the following code:

private final boolean parkAndCheckInterrupt() { LockSupport.park(this); Return thread.interrupted (); AcquireQueued (final Node Node, int arg) acquireQueued(final Node Node, int argCopy the code

Well, after the analysis of the source code, the rest of the problem to take a closer look at the code.

conclusion

So to sum up.

In a concurrent environment, locking and unlocking require the coordination of the following three components:

  1. The lock state. We need to know if the lock is being held by another thread, and that’s what state is for. When it’s 0, it means that no thread is holding the lock, so we can try to grab the lock. You add +1 to state, you subtract 1 to unlock, until state goes to 0 again, so lock() and unlock() have to pair. It then wakes up the first thread in the wait queue to claim the lock.
  2. Blocking and unblocking of threads. AQS uses locksupport. park(Thread) to suspend threads and unpark to wake them up.
  3. Block the queue. A queue is needed to manage these threads. AQS uses a FIFO queue, which is a linked list. Each node holds a reference to its successor node. AQS uses a variant of CLH lock to implement, interested readers can refer to this article on the INTRODUCTION of CLH, written simply.

Sample graph parsing

This is a review session, a simple example, and a chance to help you understand some of the things above.

The first thread calls reentrantLock.lock(). If you scroll to the front, tryAcquire(1) will return true. With state=1, there is no initialization of the head, let alone any blocking queue. If thread 1 calls unlock() and thread 2 comes in, then the world is completely peaceful, there is no intersection, so WHY do I need AQS?

Imagine what happens if thread 2 calls lock() before thread 1 calls unlock().

Thread 2 will initialize head [new Node()], and thread 2 will insert the blocking queue and suspend.

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code

First, thread 2 initializes the head node with head==tail and waitStatus==0

Then thread 2 joins the queue:

We also need to look at the waitStatus of the node at this time. We know that the head node is initialized by thread 2, the waitStatus is not set at this time, Java default is set to 0, But by this method, shouldParkAfterFailedAcquire thread 2 take precursor nodes, namely the head waitStatus is set to 1.

What is the waitStatus of thread 2? It’s 0 because it’s not set;

If the thread 3 to come in at this time, directly into the thread 2 back, at this time the thread 3 waitStatus is 0, the shouldParkAfterFailedAcquire method when the thread 2 waitStatus precursor node is set to 1.

The SIGNAL(-1) state in waitStatus means that the successor node needs to be woken up. In other words, this waitStatus actually represents the status of its successor node rather than its own state. As we know, when each node joins the queue, it changes the status of its predecessor node to SIGNAL, and then blocks, waiting to be woken up by its predecessor. There are two problems involved here: a thread unqueued and woke up. The essence is the same, but the reader can also look at the source code along the lines of “waitStatus represents the state of the successor node.”

(Full text)