introduce

ArrayBlockingQueue is A shared channel based on an array implementation. Suppose thread A wants to send A message to thread B. What is the appropriate way to tell thread B? You can do this using BlockingQueue.

ArrayBlockingQueue is an implementation of the BlockingQueue interface, which is based on an array and is therefore more suitable for bounded queues.

As mentioned earlier, it’s the passing of data between threads as data sharing, so the question is, how does the thread that consumes the queue know when the queue is empty, and what does it do when the queue is full? With the above question in mind, we proceed to the following analysis.

The following table shows the joining and unjoining operations of major queues:

Insert queue method

Method names Parameters to describe The return value Exception information
add Insert the object If the queue is full, an exception is thrown IllegalStateException(” Queue full “) exception — AbstractQueue
offer Insert the object If the queue is full, false is returned There is no
offer Insert object, wait for time True indicates that the queue was successfully inserted. False is returned if the queue is full and there is no space after waiting for a period of time There is no
put Insert the object True indicates that the queue was inserted successfully. If the queue is full, the blocking thread waits for the queue to be inserted when it is empty

Get queue contents

Method names Parameters to describe The return value Exception information
remove There is no Returns the queue head data and removes it. If the queue is empty, an exception message is thrown NoSuchElementException () anomaly, AbstractQueue
poll There is no If the column is not empty, return the first value of the column and remove it; Returns NULL if the queue is empty. Non-blocking returns immediately.
poll Waiting time Set the wait time, return null if the queue has not been retrieved within the specified time, return the first queue value if the queue is not empty
take There is no Queue not empty returns the first queue value and removes; Blocks when the queue is empty and waits until the queue is not empty before returning the first value of the queue.

Simple example

The above methods focus on the PUT and take methods, so let’s look at an example of what this queue does.

/** * ArrayBlockingQueue content test demo **@author <a href="mailto:[email protected]">battleheart</a>
 */
public class BlockQueueDemo {
    public static final ArrayBlockingQueue<Integer> arr = new ArrayBlockingQueue<>(10);

    private static int sum = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    System.out.println(i + "Ready for the team");
                    Thread.sleep(100);
                    arr.put(i);
                    System.out.println(i + "Made the team.");
                } catch(InterruptedException e) { e.printStackTrace(); }}}); thread1.start(); Thread thread =new Thread(() -> {
            try {
                for(; ;) { System.out.println("Enter the consumption queue");
                    System.out.println(arr.take());
                    Thread.sleep(1000); }}catch(InterruptedException ex) { ex.printStackTrace(); }}); thread.start(); }}Copy the code
  1. Two threads are started, one for providing data and one for consuming data
  2. The enqueue operation is preceded by sleep so that the consuming thread will consume the queue, and then the queue data provider thread will supply data to the thread.
  3. The sleep method was added to the queue exit operation in order to make the queue enter more content.
Enter the consumption queue0Prepare team0Team success1Prepare team1Team success2Prepare team0
2Team success3Prepare team3Team success4Prepare team4Team success5Prepare team5Team success6Prepare team6Team success7Prepare team7Team success8Prepare team8Team success9Prepare team9Team success10Prepare team10Team success11Ready to queue into the consumption queue1
11Team success12Ready to queue into the consumption queue2
12Team success13Ready to queue into the consumption queue3
13Team success14Ready to queue into the consumption queue4
14Team success15Ready to queue into the consumption queue5
15Team success16Ready to queue into the consumption queue6
16Team success17Ready to queue into the consumption queue7
17Team success18Ready to queue into the consumption queue8
18Team success19Ready to queue into the consumption queue9
19Queue Successfully entered the consumption queue. Procedure10Enter the consumption queue11Enter the consumption queue12Enter the consumption queue13Enter the consumption queue14Enter the consumption queue15Enter the consumption queue16Enter the consumption queue17Enter the consumption queue18Enter the consumption queue19Enter the consumption queueCopy the code

Analyze the output above:

After 100ms, 0 joins the queue successfully, 1 joins the queue successfully. When 2 is ready to join the queue, at this time, the consumer thread obtains the lock and consumes 0 in the queue, and so on. If the queue is not empty, the take method consumes.

The output results show the following:

  1. The consumption line enters first, but does not complete, that is, the consumption thread is waiting.
  2. Only one entry and exit can be carried out synchronously, that is, the entry operation will prevent the exit operation, and the exit operation will also prevent the entry operation.

Internal principle analysis

ArrayBlockingQueue defines the following fields internally:

/** array of queue elements */
final Object[] items;

/** The next element to be taken, poll, peek, remove */
int takeIndex;

/** Insert positions include put, offer, add */
int putIndex;

/** The number of queue elements */
int count;

/** reentrant lock */
final ReentrantLock lock;

/** The queue is not empty */
private final Condition notEmpty;

/** When the queue is full */
private final Condition notFull;
Copy the code

When a take() operation is performed, if the queue is empty, notEmpty waits and a notFull notification is also made to inform the notFull queue that it has a position available for enqueuing. When a new element is queued, the put method is called, and if the queue is full, the current thread suspends on notFull and notEmpty is notified that the queue is full and ready to proceed.

Put method source:

/** * inserts the element at the end of the queue and waits when the queue is full. * *@throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc} * /
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    // Get a reentrant lock.
    final ReentrantLock lock = this.lock;
    // The operation can be interrupted accordingly.
    lock.lockInterruptibly();
    try {
      	// Check whether the queue is full.
        while (count == items.length){
            // Wait here if the notification queue is empty.
            notFull.await();
        }	
      	// join the queue operation.
        enqueue(e);
    } finally{ lock.unlock(); }}Copy the code

You can get the following from the source code above:

  1. A reentrant lock is acquired for simultaneous enlistment, which explains why enlistment and unenlistment can only perform one operation at a time.
  2. Determine whether the queue element has reached the length of the queue, that is, whether the queue is full, if so, wait until there is a space in the queue.
  3. Join the team operation.

To see how enqueue works, see the following source code:

/** * Inserts an element at the current putIndex position and notifies notEmpty that the queue has content
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    // Queue array.
    final Object[] items = this.items;
    // Insert x at putIndex.
    items[putIndex] = x;
    If putIndex reaches the end of the queue, putIndex points to the head of the queue.
    if (++putIndex == items.length)
        putIndex = 0;
    // The number of queues increases by 1
    count++;
    // Notify notEmpty that the queue has content ready for consumption.
    notEmpty.signal();
}
Copy the code

Take a look at the source of the take method:

/** * the take method takes elements from the queue and notEmpty waits if the queue is empty. * /
public E take(a) throws InterruptedException {
    // Get a reentrant lock.
    final ReentrantLock lock = this.lock;
    // Respond to an interrupt request.
    lock.lockInterruptibly();
    try {
      	// If the queue is empty, notEmpty wait is performed.
        while (count == 0){
          	notEmpty.await();
        }   
      	// queue operation.
        return dequeue();
    } finally{ lock.unlock(); }}Copy the code

Through the above source code we can also see the following content:

  1. You must obtain a lock before you can operate.
  2. When the queue is empty, notEmpty wait is performed.

The dequeue method is as follows:

/** * prints the takeIndex element and notifies the notFull queue that a position is available. * Execution must first acquire the lock. * /
private E dequeue(a) {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] ! = null;
    // Queue elements.
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // Get the takeIndex information.
    E x = (E) items[takeIndex];
    // mark takeIndex as null to facilitate GC collection.
    items[takeIndex] = null;
    // Loop through the operation.
    if (++takeIndex == items.length){
      	takeIndex = 0;
    }  
    // The number of queue elements is reduced by 1
    count--;
    // The iterator takes
    if(itrs ! =null)
        itrs.elementDequeued();
    // Notify notFull that the queue is no longer full and can be put.
    notFull.signal();
    return x;
}
Copy the code

conclusion

Source code put and take to summarize a few points:

  1. ArrayBlockingQueue uses an array to implement a queue. PutIndex and takeIndex are used to control the head and tail of a queue.
  2. ReentrantLock is used internally for synchronization and Condition handles wait operations.
  3. Summarized as the following picture content:

Here’s a graphical illustration of how ArrayBlockingQueue works

Firstly, element 1 is added to the queue, as shown in the figure below:

PutIndex and takeIndex are in the same position, because it has only one element. After 8 more elements are added, the contents are as follows

PutIndex is now placed on the index of the element in the last array, and a notFull wait is performed when additional elements are added to the array element

When the take method is called, the queue is empty and notFull is notified, hey, buddy, you can add your stuff to the queue.

You can clearly see that putIndex has changed to the position of array index 0, which is caused by the following code:

if (++putIndex == items.length)
    putIndex = 0;
Copy the code

Take (1); take (1); take (1);When the take method reaches the end of the array, it sets the takeIndex value to 0 to restart the take.

if (++takeIndex == items.length)
    takeIndex = 0;
Copy the code

And when the queue is empty, it does notEmpty wait, it waits for something to be in the queue, and when it calls put, it tells notEmpty you can fetch the message from the queue.

If you like, you can follow my wechat public account and push articles from time to time