public class ProducerThread extends Thread {

    private final static Random random = new Random(System.currentTimeMillis());
    private final static AtomicInteger counter = new AtomicInteger(0);
    private final MessageQueue messageQueue;

    public ProducerThread(MessageQueue messageQueue, int seq) {
        super("Producer-" + seq);
        this.messageQueue = messageQueue;
    }

    @Override
    public void run(a) {
        while (true) {
            try {
                Message message = new Message("Message-" + counter.getAndIncrement());
                messageQueue.put(message);
                System.out.println(Thread.currentThread().getName() + " put message " + message.getData());
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                break; }}}}Copy the code

public class ConsumerThread extends Thread {

    private final static Random random = new Random(System.currentTimeMillis());
    private final MessageQueue messageQueue;

    public ConsumerThread(MessageQueue messageQueue, int seq) {
        super("Consumer-" + seq);
        this.messageQueue = messageQueue;
    }

    @Override
    public void run(a) {
        while (true) {
            try {
                Message message = messageQueue.take();
                System.out.println(Thread.currentThread().getName() + " take a message " + message.getData());
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                break; }}}}Copy the code

public class Message {

    public Message(String data) {
        this.data = data;
    }

    private String data;

    public String getData(a) {
        return data;
    }

    public void setData(String data) {
        this.data = data; }}Copy the code

public class MessageQueue {
    private final static int DEFAULT_MAX_LIMIT = 100;
    private final LinkedList<Message> queue;
    private final int limit;


    public MessageQueue(a) {
        this(DEFAULT_MAX_LIMIT);
    }

    public MessageQueue(final int limit) {
        this.limit = limit;
        this.queue = new LinkedList<>();
    }

    public void put(final Message message) throws InterruptedException {
        synchronized (queue) {
            while(queue.size() > limit) { queue.wait(); } queue.addLast(message); queue.notifyAll(); }}public Message take(a) throws InterruptedException {
        synchronized (queue) {
            while (queue.isEmpty()) {
                queue.wait();
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            returnmessage; }}public int getMaxLimit(a) {
        return this.limit;
    }

    public int getMessageSize(a) {
        synchronized (queue) {
            returnqueue.size(); }}}Copy the code

public class ProducerAndConsumerClient {

    public static void main(String[] args) {
        final MessageQueue messageQueue = new MessageQueue();

        new ProducerThread(messageQueue, 1).start();
        new ProducerThread(messageQueue, 2).start();
        new ProducerThread(messageQueue, 3).start();
        new ConsumerThread(messageQueue, 1).start();
        new ConsumerThread(messageQueue, 2).start(); }}Copy the code

Producer-1 put message Message-0
Producer-3 put message Message-2
Producer-2 put message Message-1
Consumer-1 take a message Message-0
Consumer-2 take a message Message-1
Producer-2 put message Message-3
Consumer-1 take a message Message-2
Producer-2 put message Message-4
Consumer-2 take a message Message-3
Producer-3 put message Message-5
Producer-3 put message Message-6
Producer-3 put message Message-7
Consumer-1 take a message Message-4
Producer-2 put message Message-8
Consumer-2 take a message Message-5
Producer-3 put message Message-9
Producer-1 put message Message-10
Producer-1 put message Message-11
Producer-2 put message Message-12Omit...Copy the code