This article is participating in “Java Theme Month – Java Development in Action”, see the activity link for details

preface

In the interview process last year, I was asked the question of “blocking queue” by the interviewer, because I didn’t have a deep understanding of the problem at that time, I just explained the problem according to my own understanding, and the final interview result was not good. Today, I made a brief interview about this problem and recorded it as follows. If there are any mistakes, please correct them.

What is a blocking queue

In data structures, queues follow the FIFO (first-in, first-out) principle. In Java, Queue interface defines the basic behavior and is implemented by subclasses. Common queues include ArrayDeque, LinkedList, etc. These are non-thread-safe. In Java 1.5, a blocking Queue was added. When the queue is empty, the thread fetching the element is blocked.

Producer-consumer model

The producer adds the element to the queue, retrieves the data in the consumption and completes the data processing. The coupling relationship between producer and consumer is solved by queue. When the production rate of the producer is inconsistent with the consumption rate of the consumer, the purpose of the avenue buffer can be passed.

Blocking queue usage scenarios

  1. The thread pool

    In the thread pool, when the number of worker threads is greater than or equal to corePoolSize, subsequent tasks are added to the blocking queue later;

There are currently blocking queues

In Java, the BlockingQueue interface defines the behavior of blocking queues. The common subclasses are ArrayBlockingQueue and LinkedBlockingQueue.

BlockingQueue inherits the Queue interface and has all of its features. The methods are summarized in the Java Doc of BlockingQueue

  • Insert elements
    • Add (e): When the queue is full, adding more elements throws an exceptionIllegalStateException
    • Offer (e): True if added successfully, false otherwise
    • Put :(e): Adding elements to a queue when it is full causes the thread to block
    • Offer (e, time,unit): Adds data at the end of the queue when it is full. Returns false if the queue is not added successfully within the specified time, or true if the queue is not added successfully
  • Delete element:
    • Remove (e): Return true to indicate successful deletion, otherwise return false
    • Poll (): Returns null if the queue is empty, otherwise returns the first element in the queue
    • Take (): Retrieves the first element in the queue. If the queue is empty, the thread that fetched the element becomes blocked
    • Poll (time, unit) : When the queue is empty, the thread is blocked and exits after the specified time
  • Check elements:
    • Element (): Gets the header element, and if it is null, throws itNoSuchElementException
    • Peek (): Gets the header element and returns null if the queue is empty, otherwise returns the target element

ArrayBlockingQueue

An underlying array-based bounded blocking queue whose capacity must be specified when constructing this queue;

The constructor

/ / the first
public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for(E e : c) { checkNotNull(e); items[i++] = e; }}catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally{ lock.unlock(); }}/ / the second
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

	/ / the third
	public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
Copy the code
  • Capacity: indicates the initial capacity of the queue
  • Fair: Fairness of thread access queue. If true, it is processed according to FIFO principle, and vice versa; The default is false
  • C: a collection of existing elements of type merging two arrays

Put () method

   public void put(E e) throws InterruptedException {
         // Check if the element is null
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        / / acquiring a lock
        lock.lockInterruptibly();
        try {
            // If the current queue is empty, it becomes blocked
            while (count == items.length)
                notFull.await();
            // If not, add elements
            enqueue(e);
        } finally {
            / / unlocklock.unlock(); }}private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // If the queue is not empty, wake up the consumer
        notEmpty.signal();
    }
Copy the code

Take () method

    public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        / / acquiring a lock
        lock.lockInterruptibly();
        try {
            If the queue is empty, the consumer becomes blocked
            while (count == 0)
                notEmpty.await();
            // If not null, get data
            return dequeue();
        } finally {
            / / unlocklock.unlock(); }}private E dequeue(a) {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // Get the header element x
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if(itrs ! =null)
            itrs.elementDequeued();
         // The queue is not full and the producer continues to add data
        notFull.signal();
        return x;
    }
Copy the code

LinkedBlockingQueue

An underlying unbounded blocking queue based on a unidirectional linked list. If initial capacity is not specified, default is integer-max_value, otherwise specified capacity

The constructor

	// Do not specify the capacity
	public LinkedBlockingQueue(a) {
        this(Integer.MAX_VALUE);
    }
	// Specify the capacity
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

	// is the same as merging an array
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally{ putLock.unlock(); }}Copy the code

Put () method

    public void put(E e) throws InterruptedException {
        // If the element is empty, an exception is thrown
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        // Get the amount of data in the queue
        final AtomicInteger count = this.count;
        / / acquiring a lock
        putLock.lockInterruptibly();
        try {
            // The queue is full and blocked
            while (count.get() == capacity) {
                notFull.await();
            }
            // Add the target element to the end of the list
            enqueue(node);
            // The total number increases
            c = count.getAndIncrement();
            // The queue is not full, continue to add elements
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            / / unlock
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
Copy the code

Take () method

    public E take(a) throws InterruptedException {
        E x;
        int c = -1;
        // Get the number of jobs in the queue
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        / / acquiring a lock
        takeLock.lockInterruptibly();
        try {
            // If the queue is empty, it becomes blocked
            while (count.get() == 0) {
                notEmpty.await();
            }
            // Get the header element
            x = dequeue();
            / / decline
            c = count.getAndDecrement();
            // Notify the consumer
            if (c > 1)
                notEmpty.signal();
        } finally {
            / / unlock
            takeLock.unlock();
        }
        if (c == capacity)
            // 
            signalNotFull();
        return x;
    }
Copy the code

contrast

The same

  1. In both cases, the Condition informs producers and consumers to complete the addition and acquisition of elements
  2. You can specify the capacity

The difference between

  1. ArrayBlockingQueueBased on the data,LinkedBlockingQueueBased on the list
  2. ArrayBlockingQueueThere was a lock inside,LinkedBlockingQueueThere are two locks inside


Implement a blocking queue yourself

Through the analysis of the source code, we can know that the blocking queue is actually through the notification mechanism Condition to complete the producer and consumer communication. It can also be implemented with wait(), notify, and notifyAll in the Object class. Here is a blocking queue written by yourself

public class BlockQueue {
    / / object lock
    public static final Object LOCK = new Object();
    // Control the value of the variable to inform both sides
    public boolean condition;
    
    public void put(a) {
        synchronized (LOCK) {
            while (condition) {
                try {
                    / / is full
                    System.out.println("Put queue full, starting to block");
                    LOCK.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            condition = true;
            System.out.println("Put to true, wake up the consumer"); LOCK.notifyAll(); }}public void take(a) {
        synchronized (LOCK) {
            while(! condition) {/ / not full
                System.out.println("Take queue is not full, start blocking");
                try {
                    LOCK.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            condition = false;
            System.out.println("Take to false to wake up the producer"); LOCK.notifyAll(); }}}Copy the code

Reference article:

BlockingQueue (juejin. Cn) for concurrent containers

BlockingQueue (Java Platform SE 8 ) (oracle.com)


Read the original