The last. After writing a ReentrantLock, you can actually write a ReadWriteLock based on the ReentrantLock, as described in Chapter 8 of The Art of Multiprocessor Programming. But, in line with incomplete AQS (AbstractQueuedSynchronizer) introduced a series of themes, from scratch to write a ReentrantReadWriteLock here.

As defined by ReadWriteLock, this is true at all times

  1. No thread holds the lock
  2. 1 to N threads holding the shared lock (Read)
  3. 1 thread holds an exclusive lock (Write)

One of them.

Second, a fair ReadWriteLock requires that the new Read or Write thread must wait in the queue. An unfair ReadWriteLock allows the new Read or Write thread to acquire the lock before the queue waiting line. One more thing about unfair locks: In theory, unfair locks are similar to a mob, but most implementations allow only the new and first thread in the queue to preempt the lock. ReadWriteLock is the same. If you want locks that are completely unfair, AQS and the implementation here may not meet your needs.

To implement the definition of ReadWriteLock, you need to record the read and write states separately. Given that the Write state can only have one thread, the possible scenarios are as follows:

No. Reader Count Writer Count
1 0 0
2 1 + 0
3 0 1

As you can see, the reader count and writer count change separately from state 1, where no thread holds the lock, to state 2 or 3. This feature causes you to CAS both reader count and writer count, or mix them together into one variable. Use low and high values to distinguish reader count from writer count (or positive and negative can be used). AQS does the latter. Here we simplify and give some simple relations:

count = 00000000
reader_count = count & 0x00FF
writer_count = count & 0xFF00Copy the code

Another issue to consider is how reentrant is implemented. You can increase the Reader count, or, like ReentrantLock, maintain a thread local variable for each reader thread. In terms of efficiency, the latter is better.

SimpleReadWriteLock

There is only a basic implementation of tryLock

import javax.annotation.Nonnull; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; public class SimpleReadWriteLock implements ReadWriteLock { private static final int WRITER_MASK = 0xFF00; private static final int WRITER_UNIT = 0x0100; // writer reentrant times, reader count private final AtomicInteger count = new AtomicInteger(0); private final ReadLock readLock = new ReadLock(); private final WriteLock writeLock = new WriteLock(); @Override @Nonnull public Lock readLock() { return readLock; } @Override @Nonnull public Lock writeLock() { return writeLock; } private static abstract class AbstractLock implements Lock { @Override public void lock() { throw new UnsupportedOperationException(); } @Override public void lockInterruptibly() throws InterruptedException { throw new UnsupportedOperationException(); } @Override public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException { throw new UnsupportedOperationException(); } @Override @Nonnull public Condition newCondition() { throw new UnsupportedOperationException(); } } @SuppressWarnings("Duplicates") private class ReadLock extends AbstractLock { // reentrant times of current reader private final ThreadLocal<Integer> reentrantTimes = ThreadLocal.withInitial(() -> 0); @Override public boolean tryLock() { int rt = reentrantTimes.get(); if (rt > 0) { reentrantTimes.set(rt + 1); return true; } int c = count.get(); if (((c & WRITER_MASK) == 0) && count.compareAndSet(c, c + 1)) { reentrantTimes.set(1); return true; } return false; } @Override public void unlock() { int rt = reentrantTimes.get(); if (rt <= 0) { throw new IllegalStateException("attempt to unlock without holding lock"); } if (rt > 1) { reentrantTimes.set(rt - 1); return; } reentrantTimes.set(0); if (count.get() < 1) { throw new IllegalStateException("no reentrantTimes"); } count.decrementAndGet(); } } @SuppressWarnings("Duplicates") private class WriteLock extends AbstractLock { private Thread writer; @Override public boolean tryLock() { if (writer == Thread.currentThread()) { count.getAndAdd(WRITER_UNIT); return true; } if (count.get() == 0 && count.compareAndSet(0, WRITER_UNIT)) { writer = Thread.currentThread(); return true; } return false; } @Override public void unlock() { if (writer ! = Thread.currentThread()) { throw new IllegalStateException("attempt to unlock without holding lock"); } int c = count.get(); if (c < WRITER_UNIT) { throw new IllegalStateException("no writer"); } if (c == WRITER_UNIT) { writer = null; } count.set(c - WRITER_UNIT); }}}Copy the code

Focus on how the CAS operation handles count. I’m not going to analyze the code here, but if you’ve seen the ReentrantLock analysis, it shouldn’t be too difficult.

Next, consider how to join the queue. There are two possible options for implementation

  1. As with Write, each Read is treated as a separate node
  2. The contiguous Read acts as a single node

The latter looks good because the head node (in this case, the Write node) only needs to wake up one of the subsequent nodes, otherwise multiple consecutive Read nodes need to be woken up. But AQS actually chose the first option! What’s the reason? In my opinion, the second method is not fundamentally different from the first method: wake up subsequent multiple Read nodes and wake up subsequent single Read nodes and then wake up other Read threads within that Read node. At the same time, there are a number of details. It is possible to join the queue directly (when the last node is the Write node), it is possible to follow the last Read node, and there are problems with wake loss. In short, the gains outweigh the losses. So the individual incremental implementation also uses the first method.

On wake up subsequent nodes, consider the following scenarios

 

The first is the write-only case, where only one successor node necessarily needs to be woken up. The second has only one successor Read node, so only one successor is awakened. But in the third case, the first node may need to wake up two subsequent nodes. Fourth, the Read thread serves as the first node and only needs to wake up the first subsequent Write node. The fifth case is special: this represents the middle of a continuous awakening.

As you can see from the figure, to have a Read thread as a node, the wake up must be multiple times. To be more precise, if the successor is a Read node, it is awakened consecutively, otherwise (the Write node), only one.

Consider another question, at which step do you wake up?

  1. unlock
  2. After the previous Read node acquires the lock

And who wakes whom?

  1. The nodes of unlock wake up subsequent nodes
  2. The Read node wakes up subsequent nodes

There aren’t really 4 2 by 2 designs, but these are the only two that actually work.

The first is to wake up subsequent Read nodes in unlock. The Read node then wakes up a subsequent Read node after acquiring the lock, and so on. The second is to wake up all subsequent Read nodes in unlock.

AQS chose the first option. What’s the reason? I think the reason is to ensure that the lock can be acquired before waking up. Assume that using the second design, the first Read node is awakened and the lock can be acquired, but the queue has not yet been advanced (see ReentrantLock for details on the processing of acquire and release queues). When W continues to wake up the second Read node, due to the pre-node of the second Read node, That is, the first Read node has not yet become a head, so it must continue to park, equal to invalid wake up. With the first design, however, the first Read node will only acquire the lock, set itself to head, and then wake up the subsequent Read node, at which point the second Read node will definitely acquire the lock, so it won’t be an invalid wake up.

Note that in the first diagram, the wakeup above a node is different from the wakeup of an unlock node. The wakeup above a node is only for a Read node, that is, a subsequent node is awakened by a Read node. In contrast, an UNLOCK wakes up a subsequent node unconditionally.

You may notice that PROPAGATE for Node is not mentioned in the above method, even though it looks like the continuous wake up of the Read Node is very much like propagation. The reason is that ReadWriteLock does not need PROPAGATE, and PROPAGATE can be said to be a solution for AQS based Semaphore. Here is a rough analysis of the possible problems with not PROPAGATE for Semaphore.

For Semaphore, there are two main operations

  • acquire
  • release

Release may wake up multiple threads waiting for Acquire, so we use a readlock-like ReadWriteLock model without WriteLock. Acquire corresponds to AQS acquireShared and Release corresponds to releaseShared.

  1. Suppose a Semaphore with permits is 2, thread 1 holds 1, thread 2 holds 1, thread 3 is waiting, thread 4 is waiting. The queue looks like the one above. Head doesn’t matter here, it could be sentinel, it could be thread 1 or thread 2. The two waiters are threads 3 and 4
  2. When thread 1 finishes using permit, it returns the permit through releaseShared. At this point, the head wakes up the first waiter, thread 3. Thread 3 found a permit in Semaphore and successfully obtained it, and thread 4 did not decide to wake up thread 4 because there is only one permit in Semaphore. At this point thread 3 has not yet advanced to the queue
  3. Thread 2 returns the permit through releaseShared. Head does not change and, like ReentrantLock, does not attempt to wake up a subsequent node if it has been woken up. That is, thread 3 will not wake up repeatedly
  4. Thread 3 continues to execute without awakening subsequent node 4. Permits 1 is retained, but thread 4 cannot obtain permits

In my opinion, a straightforward solution is to wake up consecutively using the previous UNLOCK, while the first node wakes up all subsequent nodes unconditionally (without checking its status). Of course, if you do this, you have a higher chance of invalid awakening. Another approach is to ensure that thread 3 becomes the first node before the next release when thread 1 returns permit. It’s hard to be honest.

AQS ‘solution is mainly

  • Check head’s status when advancing queue (setHeadAndPropagate)
  • ReleaseShared is PROPAGATE (doReleaseShared)

Thus, thread 3 of the above scenario finds head PROPAGATE and wakes up subsequent nodes.

Due to the inclusion of PROPAGATE, part of the code needs to do extra processing for this new state. In ReadWriteLock, however, there is no such problem: the Read node wakes up subsequent nodes unconditionally.

The final code

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock;
 
@SuppressWarnings("Duplicates")
public class UnfairReadWriteTimedLock3 implements ReadWriteLock {
    // private static final int READER_MASK = 0x00FF;
    private static final int WRITER_MASK = 0xFF00;
    private static final int WRITER_UNIT = 0x0100;
 
    private final AtomicInteger count = new AtomicInteger(0);
    private final Queue queue = new Queue();
    private final ReadLock readLock = new ReadLock();
    private final WriteLock writeLock = new WriteLock();
 
    @Override
    @Nonnull
    public Lock readLock() {
        return readLock;
    }
 
    @Override
    @Nonnull
    public Lock writeLock() {
        return writeLock;
    }
 
    private static abstract class AbstractLock implements Lock {
        @Override
        @Nonnull
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
    }
 
    @SuppressWarnings("Duplicates")
    private class ReadLock extends AbstractLock {
        private final ThreadLocal<Integer> reentrantTimes = ThreadLocal.withInitial(() -> 0);
 
        @Override
        public void lock() {
            if (tryLock()) {
                return;
            }
            int c;
            final Node node = new Node(Thread.currentThread(), true);
            queue.enqueue(node);
            while (true) {
                if (node.predecessor.get() == queue.head.get()) {
                    c = count.get();
                    if ((c & WRITER_MASK) == 0 && count.compareAndSet(c, c + 1)) {
                        myTurn(node);
                        return;
                    }
                }
                if (isReadyToPark(node)) {
                    LockSupport.park(this);
                }
            }
        }
 
        @Override
        public void lockInterruptibly() throws InterruptedException {
            if (tryLock()) {
                return;
            }
            int c;
            final Node node = new Node(Thread.currentThread(), true);
            Node predecessor = queue.enqueue(node);
            while (true) {
                if (predecessor == queue.head.get()) {
                    c = count.get();
                    if ((c & WRITER_MASK) == 0 && count.compareAndSet(c, c + 1)) {
                        myTurn(node);
                        return;
                    }
                }
                if (isReadyToPark(node)) {
                    LockSupport.park(this);
                }
                if (Thread.interrupted()) {
                    abort(node);
                    throw new InterruptedException();
                }
            }
        }
 
        @Override
        public boolean tryLock() {
            int rt = reentrantTimes.get();
            if (rt > 0) {
                reentrantTimes.set(rt + 1);
                return true;
            }
            int c = count.get();
            if ((c & WRITER_MASK) == 0 && count.compareAndSet(c, c + 1)) {
                reentrantTimes.set(1);
                return true;
            }
            return false;
        }
 
        @Override
        public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException {
            if (tryLock()) {
                return true;
            }
            final long deadline = System.nanoTime() + unit.toNanos(time);
            final Node node = new Node(Thread.currentThread(), true);
            Node predecessor = queue.enqueue(node);
            long nanos;
            int c;
            while (true) {
                if (predecessor == queue.head.get()) {
                    c = count.get();
                    if ((c & WRITER_MASK) == 0 && count.compareAndSet(c, c + 1)) {
                        myTurn(node);
                        return true;
                    }
                }
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    abort(node);
                    return false;
                }
                if (isReadyToPark(node)) {
                    LockSupport.parkNanos(this, nanos);
                }
                if (Thread.interrupted()) {
                    abort(node);
                    throw new InterruptedException();
                }
            }
        }
 
        private void myTurn(@Nonnull Node node) {
            reentrantTimes.set(1);
            node.clearThread();
            queue.head.set(node);
 
            /*
             * propagate if successor is reader
             *
             * In ReadWriteLock, there's no need to check if propagate, it always propagates.
             */
            if (node.resetSignalStatus()) {
                Node successor = queue.findNormalSuccessor(node);
                if (successor != null && successor.shared) {
                    LockSupport.unpark(successor.thread.get());
                }
            }
        }
 
        @Override
        public void unlock() {
            int rt = reentrantTimes.get();
            if (rt < 1) {
                throw new IllegalStateException("not the thread holding lock");
            }
            if (rt > 1) {
                reentrantTimes.set(rt - 1);
                return;
            }
            // rt == 1
            reentrantTimes.set(0);
            if (count.get() < 1) {
                throw new IllegalStateException("count < 1");
            }
            if (count.decrementAndGet() > 0) {
                return;
            }
            Node h = queue.head.get();
            if (h != null && h.resetSignalStatus()) {
                unparkNormalSuccessor(h);
            }
        }
    }
 
    @SuppressWarnings("Duplicates")
    private class WriteLock extends AbstractLock {
        private Thread owner;
 
        @Override
        public void lock() {
            if (tryLock()) {
                return;
            }
            Node node = new Node(Thread.currentThread());
            Node predecessor = queue.enqueue(node);
            while (true) {
                if (predecessor == queue.head.get() &&
                        count.get() == 0 && count.compareAndSet(0, WRITER_UNIT)) {
                    myTurn(node);
                    return;
                }
                if (isReadyToPark(node)) {
                    LockSupport.park(this);
                }
            }
        }
 
        @Override
        public void lockInterruptibly() throws InterruptedException {
            if (tryLock()) {
                return;
            }
            Node node = new Node(Thread.currentThread());
            Node predecessor = queue.enqueue(node);
            while (true) {
                if (predecessor == queue.head.get() &&
                        count.get() == 0 && count.compareAndSet(0, WRITER_UNIT)) {
                    myTurn(node);
                    return;
                }
                if (isReadyToPark(node)) {
                    LockSupport.park(this);
                }
                if (Thread.interrupted()) {
                    abort(node);
                    throw new InterruptedException();
                }
            }
        }
 
        @Override
        public boolean tryLock() {
            if (owner == Thread.currentThread()) {
                count.getAndAdd(WRITER_UNIT);
                return true;
            }
            if (count.get() == 0 && count.compareAndSet(0, WRITER_UNIT)) {
                owner = Thread.currentThread();
                return true;
            }
            return false;
        }
 
        @Override
        public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException {
            if (tryLock()) {
                return true;
            }
            long deadline = System.nanoTime() + unit.toNanos(time);
            Node node = new Node(Thread.currentThread());
            Node predecessor = queue.enqueue(node);
            long nanos;
            while (true) {
                if (predecessor == queue.head.get() &&
                        count.get() == 0 && count.compareAndSet(0, WRITER_UNIT)) {
                    myTurn(node);
                    return true;
                }
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    abort(node);
                    return false;
                }
                if (isReadyToPark(node)) {
                    LockSupport.parkNanos(this, nanos);
                }
                if (Thread.interrupted()) {
                    abort(node);
                    throw new InterruptedException();
                }
            }
        }
 
        private void myTurn(@Nonnull Node node) {
            node.clearThread();
            owner = Thread.currentThread();
            queue.head.set(node);
        }
 
        @Override
        public void unlock() {
            if (owner != Thread.currentThread()) {
                throw new IllegalStateException("not the thread holding write lock");
            }
            int c = count.get();
            if (c < WRITER_UNIT) {
                throw new IllegalStateException("no writer");
            }
            if (c > WRITER_UNIT) {
                count.set(c - WRITER_UNIT);
                return;
            }
            // c == WRITER_UNIT
            owner = null;
            // linearization point
            count.set(0);
 
            // signal successor
            Node node = queue.head.get();
            if (node != null && node.status.get() == Node.STATUS_SIGNAL) {
                node.status.set(Node.STATUS_NORMAL);
                unparkNormalSuccessor(node);
            }
        }
    }
 
    private boolean isReadyToPark(@Nonnull Node node) {
        Node predecessor = node.predecessor.get();
        int s = predecessor.status.get();
        if (s == Node.STATUS_SIGNAL) {
            return true;
        }
        if (s == Node.STATUS_ABORTED) {
            predecessor = queue.skipAbortedPredecessor(node);
            predecessor.successor.set(node);
        } else {
            predecessor.status.compareAndSet(Node.STATUS_NORMAL, Node.STATUS_SIGNAL);
        }
        return false;
    }
 
    private void abort(@Nonnull Node node) {
        node.clearThread();
 
        Node p = queue.skipAbortedPredecessor(node);
        Node ps = p.successor.get();
 
        node.status.set(Node.STATUS_ABORTED);
 
        Node t = queue.tail.get();
        if (t == node && queue.tail.compareAndSet(t, p)) {
            p.successor.compareAndSet(ps, null);
            return;
        }
 
        if (p != queue.head.get() && p.ensureSignalStatus() && p.thread.get() != null) {
            Node s = node.successor.get();
            if (s != null && s.status.get() != Node.STATUS_ABORTED) {
                p.successor.compareAndSet(ps, s);
            }
        } else {
            node.resetSignalStatus();
            unparkNormalSuccessor(node);
        }
    }
 
    private void unparkNormalSuccessor(@Nonnull Node node) {
        Node successor = queue.findNormalSuccessor(node);
        if (successor != null) {
            LockSupport.unpark(successor.thread.get());
        }
    }
 
    @SuppressWarnings("Duplicates")
    private static class Queue {
        final AtomicReference<Node> head = new AtomicReference<>();
        final AtomicReference<Node> tail = new AtomicReference<>();
 
        @Nonnull
        Node enqueue(@Nonnull Node node) {
            Node t;
            while (true) {
                t = tail.get();
                if (t == null) {
                    Node sentinel = new Node();
                    if (head.compareAndSet(null, sentinel)) {
                        tail.set(sentinel);
                    }
                } else {
                    node.predecessor.lazySet(t);
                    if (tail.compareAndSet(t, node)) {
                        t.successor.set(node);
                        return t;
                    }
                }
            }
        }
 
        @Nullable
        Node findNormalSuccessor(@Nonnull Node node) {
            Node s = node.successor.get();
            if (s != null && s.status.get() != Node.STATUS_ABORTED) {
                return s;
            }
 
            // find from tail
            s = null;
            Node c = tail.get();
            while (c != null && c != node) {
                if (c.status.get() != Node.STATUS_ABORTED) {
                    s = c;
                }
                c = c.predecessor.get();
            }
            return s;
        }
 
        @Nonnull
        Node skipAbortedPredecessor(@Nonnull Node node) {
            Node h = head.get();
            Node p = node.predecessor.get();
            while (p != h && p.status.get() == Node.STATUS_ABORTED) {
                p = p.predecessor.get();
                node.predecessor.set(p);
            }
            return p;
        }
    }
 
    private static class Node {
        static final int STATUS_NORMAL = 0;
        static final int STATUS_SIGNAL = 1;
        static final int STATUS_ABORTED = -1;
 
        final AtomicReference<Thread> thread;
        final boolean shared;
        final AtomicInteger status = new AtomicInteger(STATUS_NORMAL);
        final AtomicReference<Node> predecessor = new AtomicReference<>();
        // optimization
        final AtomicReference<Node> successor = new AtomicReference<>();
 
        Node() {
            this(null, false);
        }
 
        Node(@Nullable Thread thread) {
            this(thread, false);
        }
 
        Node(@Nullable Thread thread, boolean shared) {
            this.thread = new AtomicReference<>(thread);
            this.shared = shared;
        }
 
        void clearThread() {
            thread.set(null);
        }
 
        /**
         * Ensure signal status.
         * If current status is signal, just return.
         * If current status is normal, then try to CAS status from normal to signal.
         *
         * @return true if changed to signal, otherwise false
         */
        boolean ensureSignalStatus() {
            int s = status.get();
            return s == STATUS_SIGNAL || (s == STATUS_NORMAL && status.compareAndSet(STATUS_NORMAL, STATUS_SIGNAL));
        }
 
        /**
         * Reset signal status.
         * SIGNAL -> NORMAL
         *
         * @return true if successful, otherwise false
         */
        boolean resetSignalStatus() {
            return status.get() == STATUS_SIGNAL && status.compareAndSet(STATUS_SIGNAL, STATUS_NORMAL);
        }
    }
}Copy the code

conclusion

This concludes the “Write Yourself ReentrantLock and ReentrantReadWriteLock” series. In general, concurrent programming requires a lot of careful thinking, including details that sometimes lead to changes in your design. I hope this series will be useful for you to learn AQS and concurrent programming in Java.