AQS source code explore _05 Conditon conditional queue (handwritten an introductory BrokingQueue)

1. Condition queue introduction

  • ConditionObject is another very important internal class in AQS, which implements the Condition interface and is primarily used for conditional locking.
  • ConditionObject also maintains a queue that is used to wait for the condition to be set up. When the condition is set up, other threads move the element to the AQS queue and wait for the thread holding the lock to be awakened.
  • Condition is typically implemented in BlockingQueue. When the queue is empty, the thread fetching the element blocks on the notEmpty Condition. Once an element is added to the queue, the notEmpty Condition is notified to move the element in its queue to the AQS queue to be awakened.

2. Write a BrokingQueue by hand

2.1 Customizing the BrokingQueue Interface

/ * * *@author csp
 * @dateThe 2021-05-03 * /
public interface BrokingQueue<T> {
    /** * insert data */
    void put(T element) throws InterruptedException;

    /** * get data */
    T take(a) throws InterruptedException;
}
Copy the code

2.2 Customizing the MiniArrayBrokingQueue class

/ * * *@author csp
 * @dateThe 2021-05-03 * /
public class MiniArrayBrokingQueue implements BrokingQueue {

    /** * ReentrantLock: for thread concurrency control */
    private Lock lock = new ReentrantLock();

    /** * If the current queues are full, make the current producer thread call notful.await () * and the notFull conditional queue hang, waiting for the consumer thread to consume data before waking up
    private Condition notFull = lock.newCondition();

    /** * When consuming data, the current queues must first check if there is data in the queues. If there is no data in the queues, the current queues must call notempty.await () * so that the notEmpty conditional queue is suspended, waiting for the producer to produce data before waking up! * /
    private Condition notEmpty = lock.newCondition();

    /** * an array queue that stores elements */
    private Object[] queues;

    /** * The length of the array queue */
    private int size;

    /** * count: number of bytes that can be consumed * putptr: number of bytes that can be consumed * putptr: number of bytes that can be consumed Record the next location of consumer consumption data. After each consumer has consumed a data, it will takeptr++ */
    private int count, putptr, takeptr;

    public MiniArrayBrokingQueue(int size) {
        this.size = size;
        this.queues = new Object[size];
    }

    /** * insert data **@param element
     */
    @Override
    public void put(Object element) throws InterruptedException {
        / / acquiring a lock
        lock.lock();
        try {
            // Determine whether the current Queues array is full
            if (count == this.size) {
                // If it is full, the notFull conditional queue is suspended, waiting for the consumer thread to consume a data to wake up ~
                notFull.await();
            }

            Queues are not sufficient. Queues can store data in queues
            this.queues[putptr] = element;
            putptr++;

            // Determine whether the maximum value of the array queue has been reached
            if (putptr == this.size) {
                // putptr returns to 0.
                // Because: in multithreaded execution, there is always a consumer thread consuming data, even if the current producer thread has moved putptr to size,
                The queues have been queued before the size of the queues. The queues have been queued before the size of the queues.
                // The purpose of restoring putptr to 0 is to keep finding empty bytes in the Queues array used to hold elements
                putptr = 0;
            }

            // The amount of data that can be consumed in the current queue ++
            count++;

            // What do I need to do after I successfully put an element into the queue?
            // notEmpty needs to be given a wake-up call to tell the consumer to spend
            notEmpty.signal();
        } finally {
            / / releases the locklock.unlock(); }}/** * get data **@return* /
    @Override
    public Object take(a) throws InterruptedException {
        / / acquiring a lock
        lock.lock();
        try {
            // Check whether the current Queues array is empty
            if (count == 0) {
                If queues are empty, queue notEmpty is suspended until the producer thread produces data
                notEmpty.await();
            }

            // There is data in the queue that can be consumed ~
            Object element = this.queues[takeptr];
            takeptr++;

            // After the new data is consumed, determine whether the maximum value of the array queue is reached
            if (takeptr == this.size) {
                // takeptr back to 0.
                // Because: in multithreaded execution, there is always a producer thread producing data, even if the current consumer thread has moved the takeptr position to size,
                // The queues have been queued before the size of the queues, so the queues are not empty.
                // The purpose of restoring takeptr to 0 is to keep finding data that can be consumed in the Queues array
                takeptr = 0;
            }

            // The amount of data that can be consumed in the current queue
            count--;

            // After successfully consuming an element from the queue, what do I need to do?
            NotFull needs a wake up signal to tell the producer to produce
            notFull.signal();

            // Return the consumed element
            return element;
        } finally {
            / / releases the locklock.unlock(); }}// Test procedure:
    public static void main(String[] args) {
        BrokingQueue<Integer> queue = new MiniArrayBrokingQueue(10);

        Thread producer = new Thread(() -> {
            int i = 0;
            while (true) {
                i++;
                if (i == 10) {
                    i = 0;
                }

                try {
                    System.out.println("Producer thread produces data:" + i);
                    queue.put(Integer.valueOf(i));
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                }
            }
        });
        producer.start();

        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    Integer result = queue.take();
                    System.out.println("Consumer thread consumes data:" + result);
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch(InterruptedException e) { e.printStackTrace(); }}}); consumer.start(); }}Copy the code