preface

ArrayBlockingQueue Is a blocking queue implemented as an array. It is thread-safe and meets the characteristics of queues: first-in, first-out. Let’s analyze its source code and understand its implementation process.

1, properties,

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
	// An array of elements
    final Object[] items;
	// Fetch pointer: points to the address of the next element to be fetched
    int takeIndex;
	// Place pointer to the address of the next element to be added
    int putIndex;
	// The number of elements in the queue
    int count;
	/ / exclusive lock
    final ReentrantLock lock;
	// Non-null condition
    private final Condition notEmpty;
	// Non-full condition
    private final Condition notFull;
}
Copy the code

2. Construction method

public ArrayBlockingQueue(int capacity) {
	// Initialize the queue capacity, default unfair lock
	this(capacity, false);
}
// Initialize the queue capacity to specify whether it is a fair lock
public ArrayBlockingQueue(int capacity, boolean fair) {
	if (capacity <= 0)
		throw new IllegalArgumentException();
	this.items = new Object[capacity];
	// Initialize the exclusive lock
	lock = new ReentrantLock(fair);
	// Initialize two conditions
	notEmpty = lock.newCondition();
	notFull =  lock.newCondition();
}
// Initialize the queue capacity, specify whether the lock is fair, initialize the capacity
public ArrayBlockingQueue(int capacity, boolean fair,
						  Collection<? extends E> c) {
	this(capacity, fair);
	final ReentrantLock lock = this.lock;
	lock.lock(); // Lock only for visibility, not mutual exclusion
	try {
		int i = 0;
		try {
			for (E e : c) {
				checkNotNull(e);
				items[i++] = e;
			}
		// The size of the array is fixed after initialization
		} catch (ArrayIndexOutOfBoundsException ex) {
			throw new IllegalArgumentException();
		}
		count = i;
		putIndex = (i == capacity) ? 0 : i;
	} finally{ lock.unlock(); }}Copy the code

3, team

There are four methods to join a team: add(E E), offer(E E), Put (E E), offer(E E, long timeout, TimeUnit Unit). Here are the differences:

public boolean add(E e) {
	// Call the add(e) method of the parent class
	return super.add(e);
}
/ / super. The add (e) methods:
public boolean add(E e) {
    // Call the offer(e) method, returning true on success
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

public boolean offer(E e) {
	// Check if the element is empty
	checkNotNull(e);
	final ReentrantLock lock = this.lock;
	// Obtain an exclusive lock to ensure thread safety
	lock.lock();
	try {
		// If the array is full
		if (count == items.length)
			/ / returns false
			return false;
		else {
			// If the array is not full, it will be queued
			enqueue(e);
			return true; }}finally {
		// Release the exclusive locklock.unlock(); }}public void put(E e) throws InterruptedException {
	checkNotNull(e);
	final ReentrantLock lock = this.lock;
	// The thread interrupts and throws an exception
	lock.lockInterruptibly();
	try {
		while (count == items.length)
			// The array is full, and notFull waits for an element to be removed from the array
			// Wait, where multiple threads may be blocking on the lock
			notFull.await();
		/ / into the queue
		enqueue(e);
	} finally{ lock.unlock(); }}// Add a wait timeout mechanism, just like put
public boolean offer(E e, long timeout, TimeUnit unit)
	throws InterruptedException {

	checkNotNull(e);
	long nanos = unit.toNanos(timeout);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		while (count == items.length) {
			if (nanos <= 0)
				return false;
			nanos = notFull.awaitNanos(nanos);
		}
		enqueue(e);
		return true;
	} finally{ lock.unlock(); }}// queue: uses a loop of Pointers to store elements
private void enqueue(E x) {
    final Object[] items = this.items;
    // Place the element where the pointer is placed
    items[putIndex] = x;
    // If the pointer is equal to the length of the array, return the header
    if (++putIndex == items.length)
        putIndex = 0;
    // The number of arrays increases by 1
    count++;
    // An element was enqueued, so call notEmpty to fetch the element
    notEmpty.signal();
}
Copy the code

4, out of the team

Poll (); poll(); take(); poll(long timeout, TimeUnit unit); poll(long timeout, TimeUnit unit);

public E remove(a) {
    // Call the poll() method to poll and return the poll element
    E x = poll();
    if(x ! =null)
        return x;
    else
		// If there is no queue, throw an exception
        throw new NoSuchElementException();
}

public E poll(a) {
    final ReentrantLock lock = this.lock;
    / / lock
    lock.lock();
    try {
        // If the number of queues is 0, null is returned; otherwise, the queue is removed
        return (count == 0)?null : dequeue();
    } finally {
		/ / releases the locklock.unlock(); }}public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    / / lock
    lock.lockInterruptibly();
    try {
        // The number of queues is 0
        while (count == 0)
			// Block waits on condition notEmpty
            notEmpty.await();
        // If there are elements, the team is out
        return dequeue();
    } finally {
        / / unlocklock.unlock(); }}// same as take, add block timeout
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally{ lock.unlock(); }}private E dequeue(a) {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // Fetch the element in the "fetch pointer" position
    E x = (E) items[takeIndex];
    // Set the "fetch pointer" position to null
    items[takeIndex] = null;
    // Move the pointer forward to the maximum length of the array
    if (++takeIndex == items.length)
		// return the array head position 0 again
        takeIndex = 0;
    // The number of elements is reduced by 1
    count--;
    if(itrs ! =null)
        itrs.elementDequeued();
    // Wake up the notFull condition
    notFull.signal();
    return x;
}
Copy the code

5, summary

  1. The ArrayBlockingQueue length is fixed and specified at initialization, so be careful about the length.
  2. ArrayBlockingQueue is thread-safe, utilizing a ReentrantLock and two conditions to ensure concurrency safety.
  3. ArrayBlockingQueue defines four classes of methods to throw an exception, return a value, block, and timeout to ensure different scenarios.

conclusion

In the previous article, we learned about segmented locking in ConcurrentHashMap. Can segmented locking be used for exiting and enrolling teams, and if so, how?

If you found this post helpful, please give it a thumbs up and a follow.