preface


After reading AQS related locks and synchronization helper, read JUC and queue related source code together. Let’s start with the first one: ArrayBlockingQueue.


[liuzhirichard] Record technology, development and source code notes in work and study. From time to time, share what you’ve seen and heard in your life. Welcome to guide!

introduce

Bounded BlockingQueue blocks queues supported by arrays.

The command elements of this queue are FIFO (first in, first out). The head of a queue is the element that has been in the queue for the longest time. At the end of the queue is the element that has been in the queue for the shortest time. The new element is inserted at the end of the queue, and the queue retrieval operation retrieves the element at the head of the queue.

This is a typical “bounded buffer” in which a fixed-size array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be changed. Attempting to put an element into a full queue will block the operation; Attempting to take an element from an empty queue will similarly block.

This class supports an optional fair policy of ordering waiting for producer and consumer threads. By default, this order is not guaranteed. However, queue fairness is set to build true to ensure that threads are accessed in FIFO order. Fairness generally reduces throughput, but reduces variability and avoids hunger.

The basic use


public class ArrayBlockingQueueTest {

    private static final ArrayBlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(10);

    private static final CountDownLatch LATCH = new CountDownLatch(2);

    public static void main(String[] args) {

        ExecutorService pool = new ThreadPoolExecutor(2.2.0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
                new ThreadPoolExecutor.AbortPolicy());


        pool.submit(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    Thread.sleep(1000L);

                    QUEUE.put("Eggs" + Thread.currentThread().getName());
                    System.out.println("Put into the element");
                } catch (InterruptedException ignored) {
                }
            }
            LATCH.countDown();
        });


        pool.submit(() -> {

            for (int i = 0; i < 100; i++) {
                try {
                    Thread.sleep(500L);

                    String take = QUEUE.take();

                    System.out.println("take = " + take);
                } catch (InterruptedException ignored) {
                }
            }
            LATCH.countDown();

        });
        try {
            LATCH.await();
        } catch(InterruptedException ignored) { } pool.shutdown(); }}Copy the code

The demo is just a very simple version that was written on the fly.

Question question

  1. How does ArrayBlockingQueue work?
  2. What is the difference between the in-queue and out-queue methods?

Source code analysis

The basic structure

Parameter is introduced


/** array - Stores the elements in the queue */
final Object[] items;

/** Next take, poll, peek or remove index */
int takeIndex;

/** Next index to put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;


/** Main lock guarding all access */
final ReentrantLock lock;

/** Whether to wait */ for the take operation
private final Condition notEmpty;

/** Whether to wait */ during the put operation
private final Condition notFull;

Copy the code

The constructor

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

// Specify the capacity, and whether it is fair
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
// Add the element when initializing
public ArrayBlockingQueue(int capacity, boolean fair,
                            Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for(E e : c) { checkNotNull(e); items[i++] = e; }}catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally{ lock.unlock(); }}Copy the code

Add elements

public boolean add(E e) {
    return super.add(e);
}

// the parent method calls offer
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
/ / use lock
public boolean offer(E e) {
    checkNotNull(e);
    / / lock
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true; }}finally{ lock.unlock(); }}If the queue is full, wait
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally{ lock.unlock(); }}Copy the code
  1. Add method: The add method of the parent Class AbstractQueue is called and the offer method is called internally. If the offer returns false, an exception is thrown.
  2. The offer method verifies that the element is not empty, adds a mutex, returns false if the queue is full, and calls enQueue if not.
  3. Put: checks that the element is not empty and adds a mutex. If the queue is full, it spins and waits. If the queue is not full, it calls enQueue and adds elements.

So we still need to look at the enqueue method:

// This can only be called when the lock is acquired
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    PutIndex Indicates the index of the next put, offer, or add
    // Assign to it, and then ++putIndex
    items[putIndex] = x;
    // If equal to length, specify start
    if (++putIndex == items.length)
        putIndex = 0;
    // Add ++ to the number of elements
    count++;
    // If an element is queued, wake up the thread waiting for the element
    notEmpty.signal();
}
Copy the code

Access to elements

public E poll(a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0)?null : dequeue();
    } finally{ lock.unlock(); }}public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally{ lock.unlock(); }}Copy the code

It can be seen from the source code:

  1. The difference between pool and take is that the poll method returns NULL when there are no elements in the queue, while the take method blocks and waits until the element is retrieved from the queue.
  2. The poll and take methods are both called dequeue methods to get elements.
private E dequeue(a) {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] ! = null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // Get the element and set it to null
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    // takeIndex Next take, poll, peek or remove index
    // points to the next element, and the number of elements is reduced
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    // Update the iterator state
    if(itrs ! =null)
        itrs.elementDequeued();
    // Wake up the thread waiting to put the element in
    notFull.signal();
    return x;
}
Copy the code

Check the elements

public E peek(a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally{ lock.unlock(); }}Copy the code

conclusion

Q&A

Q: How does ArrayBlockingQueue work?

A: ArrayBlockingQueue is an array-based implementation that uses the ReentrantLock mutex internally to prevent collisions with concurrent placement or retrieval of elements.

Q: What is the difference between the in-queue and out-queue methods?

methods role
add Add element, queue full, add failure to throw legacy
offer Add element, queue full, add failed, return false
put Add element, queue full, block waiting
poll Pops the element, returns NULL if the queue is empty
take Pops an element, and if the queue is empty, there is an element in the wait queue
peek View the earliest element put into the queue

conclusion

ArrayBlockingQueue uses the ReentrantLock mutex, which locks elements when they are enqueued and when they are dequeued, so that only one thread is enqueued or dequeued at the same time to ensure thread safety.