I have learned the producer/consumer model for so long that I finally realized it today. Before WRITING, I will review the producer/consumer model.

The producer/consumer pattern is a classic process synchronization problem in operating systems that can be solved using a “semaphore” mechanism, noted below.

Analyze what needs to be done before writing:

  1. Define a cache queue and select a collection to cache. The cache only does two things: store data and fetch data
  2. Define a producer thread to produce data and store it in the cache
  3. Define a consumer to cache access data

Methods a

Use LinkedHashMap as cache and synchronized as lock

Define a cache queue

public class PublicQueue<T> {

    // The index of data insertion
    private int putIndex = 0;
    // Maximum capacity
    private int maxCount = 50;

    / / the buffer
    private LinkedHashMap<Integer, T> linkedHashMap = new LinkedHashMap<>();


    /** * Add data to the blocking queue *@param msg
     */
    public synchronized void put(T msg) {
        // If the cached data reaches maxCount, block
        if (linkedHashMap.size() == maxCount) {
            try {
                wait();
            } catch(InterruptedException e) { e.printStackTrace(); }}else {
            // If it is not full, wake up all other threads
            notifyAll();
        }

        // Store data in the cache
        linkedHashMap.put(putIndex, msg);
        System.out.println("Produce a product with the current commodity subscript:"+putIndex+"=== the text is:"+msg+"=== Cache length is:"+linkedHashMap.size());
        / / update the putIndex
        putIndex = (putIndex + 1 >= maxCount) ? (putIndex + 1) % maxCount : putIndex + 1;
    }


    /*** * fetch data from the blocking queue *@return* /
    public synchronized T get(a) {
        // If the blocking queue is empty, the consumer is blocked
        if (linkedHashMap.size() == 0) {
            try {
                wait();
            } catch(InterruptedException e) { e.printStackTrace(); }}else {
            notifyAll();
        }

        // Fetch data from Iterator to ensure that the retrieved data is ordered
        Iterator iterator = linkedHashMap.entrySet().iterator();
        T t = null;
        if (iterator.hasNext()) {
            Map.Entry<Integer, T> entry = (Map.Entry<Integer, T>) iterator.next();
            t = entry.getValue();
            int index = entry.getKey();
            linkedHashMap.remove(index);
            System.out.println("Consume a product with the current commodity subscript:"+index+"=== the text is:"+ t +"=== Cache length is:"+linkedHashMap.size());
        }
        returnt; }}Copy the code

Define a producer thread

public class Provider extends Thread{

    private PublicQueue publicQueue;

    public Provider(PublicQueue publicQueue) {
        this.publicQueue = publicQueue;
    }

    @Override
    public void run(a) {
        for (int i = 0; i < 60; i++) { publicQueue.put(String.valueOf(i)); }}}Copy the code

Define a consumer thread

public class Consumer extends Thread{

    private PublicQueue publicQueue;

    public Consumer(PublicQueue publicQueue) {
        this.publicQueue = publicQueue;
    }

    @Override
    public void run(a) {
        for(; ;) { publicQueue.get(); }}}Copy the code

test

public class ProviderConsumerTest {
    public static void main(String[] args) {
        PublicQueue publicQueue = new PublicQueue();
        Provider provider = new Provider(publicQueue);
        Consumer consumer = newConsumer(publicQueue); provider.start(); consumer.start(); }}Copy the code

Way 2

This approach is also relatively simple, using the Blocking queue provided by Java

public class PublicQueue<T> {
    / / the buffer
    private BlockingDeque<T> blockingDeque = new LinkedBlockingDeque<>(50);

    public void add(T msg){

        try {
            blockingDeque.put(msg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Produce a product with the current commodity subscript:"+"=== the text is:"+msg);
    }

    public T get(a){

        T t = null;
        try {
            t = blockingDeque.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Consume a product with the current commodity subscript:"+"=== the text is:"+t);
        returnt; }}Copy the code