consumers

package consumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;


public class Consumer implements Runnable {

	/ * * util. Concurrent. BlockingQueue communication bridge of producers and consumers * /
	BlockingQueue<String> queue;
	String id;
	@SuppressWarnings("unused")
	private volatile boolean      isRunning               = true;
	public Consumer(BlockingQueue<String> queue, String id) {
        this.queue = queue;
        this.id = id;
    }
	
    public void stop(a) {
        isRunning = false;
    }
    
	@Override
	public void run(a) {
		System.out.println("Thread: " + id + " Consumer thread is running...");
        boolean isRunning = true;
        try {
            while (isRunning) {
                System.out.println("Thread: " + id + " fetch data from linkedQueue..." + " queue size: " + queue.size());
                Poll returns null */ if there is nothing left to fetch after two seconds
                String data = queue.poll(2, TimeUnit.SECONDS);
                if (null! = data) { System.out.println("Thread: " + id + " has consumed one data from queue: " + data
                    		+ " Queue sise: " + queue.size());
                    // simulate data consumption
                    Thread.sleep(1000);
                } else {
                    isRunning = false;
                    // The consumer is ready to exit
                    System.out.println("Thread: " + id + " Consumer read queue timeout"); }}}catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("Thread: " + id + " consumer thread ends"); }}}Copy the code

producers

package consumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {
 
	BlockingQueue<String> queue;
	String id;
    public Producer(BlockingQueue<String> queue, String id) {
        this.queue = queue;
        this.id = id;
    }
 
    @Override
    public void run(a) {
        String data = null;
        
        try {
            while (isRunning) {
                System.out.println("PRODUCER: " + id + " is running");
                Thread.sleep(100);
 
                data = "data:" + count.incrementAndGet();
                System.out.println("Thread: " + id + " procedued data into queue: " + data + "...");
                if(! queue.offer(data,2, TimeUnit.SECONDS)) {
                    System.out.println("failed to put data into queue: "+ data); }}}catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("Thread: " + id + " quit from producer thread"); }}public void stop(a) {
        isRunning = false;
    }
 
    private volatile boolean      isRunning               = true;
    private static AtomicInteger  count                   = new AtomicInteger();
 
}
Copy the code

The test code

package consumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class ConsumerProducerTest {
 
    public static void main(String[] args) throws InterruptedException {

        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(15);
 
        Producer producer1 = new Producer(queue, "PROD1");
        Producer producer2 = new Producer(queue, "PROD2");
        Producer producer3 = new Producer(queue, "PROD3");
        Consumer consumer1 = new Consumer(queue, "CONSUMER1");
        Consumer consumer2 = new Consumer(queue, "CONSUMER2");
 
        ExecutorService service = Executors.newCachedThreadPool();
        
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer1);
        service.execute(consumer2);
 
        Thread.sleep(3 * 1000);
        producer1.stop(); // The producer must be closed first
        producer2.stop();
        producer3.stop();
        consumer1.stop();
        consumer2.stop();
 
        Thread.sleep(2000); service.shutdown(); }}Copy the code