Java geek

Related reading:

Java Concurrent programming (1) Knowledge map Java concurrent programming (2) Atomic Java concurrent programming (3) Visibility Java concurrent programming (4) Sequential Java concurrent programming (5) Introduction to Creating threads Java concurrent programming (6) Synchronized usage Java Concurrency programming Introduction (7) Easy to understand wait and Notify and use scenarios Java concurrency programming Introduction (8) Thread lifecycle Java concurrency programming Introduction (9) Deadlock and deadlock bit Java concurrency programming Introduction (10) lock optimization Introduction to Concurrent Programming in Java (11) Flow limiting scenario and Spring flow limiter Introduction to Concurrent programming in Java (13) Read/write lock and cache template Introduction to Concurrent programming in Java (14) CountDownLatch Application Scenario (15) CyclicBarrier application scenario Introduction to Concurrent programming in Java (16) seconds to understand the difference between the thread pool Java Concurrent programming introduction (17) one picture master common classes and interfaces for threads Java concurrent programming introduction (18) Again on thread safety Java concurrent programming introduction (19) Asynchronous task scheduling tool CompleteFeature Java Concurrent programming introduction (20) Common locking scenarios and locking tools


I. Application scenarios

The producer and consumer mode is applied to asynchronous processing scenarios. The advantage of asynchronous processing is that producers and consumers are decoupled and not interdependent. Producers do not need to wait for consumers to finish processing, so they can continue to produce and consume content, which greatly improves the efficiency.

Second, code class structure

The producer and consumer code classes are structured as follows:


2. A Producer is a Producer, in this case an abstract class, and a subclass needs to implement the generateTask method.

3.Consumer is a Consumer, in this case an abstract class, and subclasses need to implement the exec method.

4. The Producer and Consumer are just abstract code templates with simple logic. Appropriate templates can be written according to actual needs.

C, Show me code

I, BlockedQueue. Java

import java.util.Vector;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/ * * *@ClassName BlockedQueue
 * @DescriptionBlock the task queue. If a task has reached its maximum capacity, it will block waiting *@AuthorSonorous leaf *@Date 2019/10/5 11:32
 * @Version1.0 * * * / javashizhan.com
public class BlockedQueue<T>{

    / / lock
    private final Lock lock = new ReentrantLock();

    // Condition variable: the queue is not satisfied
    private final Condition notFull = lock.newCondition();

    // Condition variable: the queue is not empty
    private final Condition notEmpty = lock.newCondition();

    // Task set
    private Vector<T> taskQueue = new Vector<T>();

    // Queue capacity
    private final int capacity;

    /** * constructor *@paramCapacity Queue capacity */
    public BlockedQueue(int capacity) {
        this.capacity = capacity;
    }

    /** * queue operation *@param t
     */
    public void enq(T t) {
        lock.lock();
        try {
            System.out.println("size: " + taskQueue.size() + " capacity: " + capacity);
            while (taskQueue.size() == this.capacity) {
                // Wait until the queue is full
                notFull.await();
            }

            System.out.println(Thread.currentThread().getName() + " add task: " + t.toString());
            taskQueue.add(t);

            // After joining the queue, the queue is not empty
            notEmpty.signal();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}/** * queue operation *@return* /
    public T deq(a){
        lock.lock();
        try {
            try {
                while (taskQueue.size() == 0) {
                    // Wait when queue is empty. Wait when queue is not emptynotEmpty.await(); }}catch (InterruptedException e) {
                e.printStackTrace();
            }

            T t = taskQueue.remove(0);

            // If the queue is not satisfied, you can continue to join the queue
            notFull.signal();

            return t;
        }finally{ lock.unlock(); }}}Copy the code

II, Producer. Java

/ * * *@ClassName Producer
 * @DescriptionProducer, this class is relatively simple, using inheritance does not save much code, can be inherited, can also be self-implemented. *@AuthorSonorous leaf *@DateMoreover * 2019/10/5,@Version1.0 * * * / javashizhan.com
public abstract class Producer<T> implements Runnable {

    private BlockedQueue<T> taskQueue;

    public Producer(BlockedQueue<T> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void run(a) {
        while(true) {
            T[] tasks = generateTask();
            if (null! = tasks && tasks.length >0) {
                for(T task: tasks) {
                    if (null! = task) {this.taskQueue.enq(task);
                    }
                }
            }
        }
    }

    /** * generates tasks using the "template method" design pattern, as long as subclasses implement this method. *@return* /
    public abstract T[] generateTask();
}
Copy the code

III, Consumer. Java

/ * * *@ClassName Consumer
 * @DescriptionConsumer, this class is relatively simple, using inheritance does not save much code, can be inherited, can also be self-implemented. *@AuthorSonorous leaf *@Date"2019/10/5 *@Version1.0 * * * / javashizhan.com
public abstract class Consumer<T> implements Runnable {

    private BlockedQueue<T> taskQueue;

    public Consumer(BlockedQueue<T> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void run(a) {
        while(true) { T task = taskQueue.deq(); exec(task); }}/** * performs tasks using the "template method" design pattern. Subclasses can implement this method only@param task
     */
    public abstract void exec(T task);
}
Copy the code

IV. Use code examples

import java.util.Vector;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/ * * *@ClassName BlockedQueue
 * @Description TODO
 * @AuthorSonorous leaf *@Date2019/10/5 do *@Version1.0 * * * / javashizhan.com

public class LockTest {
    public static void main(String[] args) {
        BlockedQueue<String> taskQueue = new BlockedQueue<String>(10);

        for (int i = 0; i < 3; i++) {
            String producerName = "Producder-" + i;
            Thread producer = new Thread(new Producer<String>(taskQueue) {
                @Override
                public String[] generateTask() {
                    String[] tasks = new String[20];
                    for (int i = 0; i < tasks.length; i++) {
                        long timestamp = System.currentTimeMillis();
                        tasks[i] = "Task_" + timestamp + "_" + i;
                    }
                    return tasks;
                }
            }, producerName);
            producer.start();
        }

        for (int i = 0; i < 5; i++) {
            String consumerName = "Consumer-" + i;
            Thread consumer = new Thread(new Consumer<String>(taskQueue) {
                @Override
                public void exec(String task) {
                    System.out.println(Thread.currentThread().getName() + " do task [" + task + "]");
                    // Sleep for a while to simulate the task execution time
                    sleep(2000);
                }

                private void sleep(long millis) {
                    try {
                        Thread.sleep(millis);
                    } catch(InterruptedException e) { e.printStackTrace(); } } }, consumerName); consumer.start(); }}}Copy the code

Output log:

size: 0 capacity: 10
Producder-1 add task: Task_1570250409102_0
size: 1 capacity: 10
Producder-1 add task: Task_1570250409103_1
size: 2 capacity: 10
Producder-1 add task: Task_1570250409103_2
size: 3 capacity: 10
Producder-1 add task: Task_1570250409103_3
size: 4 capacity: 10
Producder-1 add task: Task_1570250409103_4
size: 5 capacity: 10
Producder-1 add task: Task_1570250409103_5
size: 6 capacity: 10
Producder-1 add task: Task_1570250409103_6
size: 7 capacity: 10
Producder-1 add task: Task_1570250409103_7
size: 8 capacity: 10
Producder-1 add task: Task_1570250409103_8
size: 9 capacity: 10
Producder-1 add task: Task_1570250409103_9
size: 10 capacity: 10
size: 10 capacity: 10
size: 10 capacity: 10
Consumer-0 do task [Task_1570250409102_0]
Consumer-4 do task [Task_1570250409103_1]
Consumer-3 do task [Task_1570250409103_2]
Producder-1 add task: Task_1570250409103_10
Consumer-1 do task [Task_1570250409103_3]
Producder-0 add task: Task_1570250409102_0
size: 8 capacity: 10
Producder-0 add task: Task_1570250409103_1
size: 9 capacity: 10
Producder-0 add task: Task_1570250409103_2
size: 10 capacity: 10
size: 10 capacity: 10
Consumer-2 do task [Task_1570250409103_4]
Producder-0 add task: Task_1570250409103_3
size: 10 capacity: 10
Consumer-3 do task [Task_1570250409103_6]
Producder-2 add task: Task_1570250409103_0
Consumer-1 do task [Task_1570250409103_5]
size: 9 capacity: 10
Producder-2 add task: Task_1570250409103_1
size: 10 capacity: 10
Consumer-4 do task [Task_1570250409103_7]
Consumer-0 do task [Task_1570250409103_8]
Producder-1 add task: Task_1570250409103_11
size: 9 capacity: 10
Producder-1 add task: Task_1570250409103_12
size: 10 capacity: 10
Consumer-2 do task [Task_1570250409103_9]
Producder-1 add task: Task_1570250409103_13
size: 10 capacity: 10
Copy the code

Other instructions

1. Lock is used here to Lock, Lock is more flexible than synchronized keyword Lock, if there are special needs, easy to change.

Synchronized is an example of how to implement producer and consumer patterns using Java concurrent Programming (part 7). That code isn’t generic enough, so you can make it more generic.

3. In this example, synchronized locking is not that different from wait and Notify using Java concurrency programming.

4. Need to pay attention to when using a bounded blocking queue producer production task process is controllable, if it is a third party not controlled call, when the production speed is far greater than consumers processing tasks, may suspend for a long time, due to obstruction or hang, time is too long, cause the waiting thread is too much, or timeout failed. Blocking is not appropriate, and an exception should be thrown when the queue is full to tell the caller not to wait.

end.


<– Read the mark, left like!