Want to get more dry goods, pay attention to the public account: Javak class representative, one more day, waiting for you!

3 Publish/Subscribe


In the previous section, we created a work queue. The goal is to distribute each task to only one worker. In this section, we’re going to take a different approach: we’re going to deliver a message to all consumers. This pattern is called Publish/Subscribe.

To demonstrate this pattern, we will build a logging system. It consists of two applications — the first sends log messages, and the second receives and prints log messages.

In our logging system, each running receiver receives an elimination (the same message is received by each consumer). In this way, we can have a recipient save logs to hard disk; The other prints a log on the screen.

In fact, published log messages are broadcast to all receivers.

Switching (Exchanges)

In the previous tutorial, we sent and received messages directly through a queue. Now it’s time to introduce the complete messaging model in RabbitMQ.

A quick review of what we’ve already covered:

  • A producer is an application that sends messages.

  • A queue is a message buffer.

  • A consumer is an application that receives messages.

The core idea of the RabbitMQ message model is that producers never directly send messages to queues. In fact, in most cases, the producer does not even know which queue the message will be distributed to.

Instead, producers can only send messages to exchanges. The exchange is very simple, on the one hand it receives the message from the producer, on the other hand it pushes the message to the queue. The exchange must know exactly how to process incoming messages. Should I send messages to a queue? Or to multiple queues? Or throw away the message? The exchange type defines specific behavior rules.

There are the following routing types: Direct, Topic, headers, and FANout. Let’s look at the last one first, fanout:

channel.exchangeDeclare("logs", "fanout");
Copy the code

The fanout type of exchange is very simple. As its name suggests, it broadcasts incoming messages to all queues it knows. This is exactly how our logging system needs to be.

List all exchanges

To list all the exchanges on the server, use rabbitmqctl:

sudo rabbitmqctl list_exchanges
Copy the code

Some exchanges with names like AMq.* and the default (unnamed) exchange will appear in the list. These are created by default and are not needed at this time.

An exchange without a name

In the previous tutorial, we didn’t know that exchanges existed, but we could still send messages to queues. This is because we use the default exchange, identified by a null character (“”).

Think back to our previous announcements:

channel.basicPublish("", "hello", null, message.getBytes());
Copy the code

The first argument is the name of the exchange, and the null character indicates that the default exchange is used: if the message exists, it is routed to the queue via the specified routingKey.

Now we can send to the exchange with the specified name:

channel.basicPublish( "logs", "", null, message.getBytes());
Copy the code

Temporary queues

You may remember earlier that we used queues with names (remember hello and task_queue?). . Naming the queue is crucial because we need the worker to listen to the corresponding queue. When you want to share a queue between producers and consumers, you must name the queue.

But that doesn’t apply to our logging system. We need to listen for all log messages, not just some. And we only care about messages that are currently being sent, not historical messages. To do this, we need to do two things:

First, every time we connect to RabbitMQ, we need a new queue. We can do this by creating a randomly named queue each time, or better yet, having the server create a randomly named queue.

Second, once the queue has no consumer connections, it will be deleted automatically.

In the Java client, when we call queueDeclare() with no arguments, we create a nonpersistent, dedicated, automatically deleted queue:

String queueName = channel.queueDeclare().getQueue();
Copy the code

To learn more about the EXCLUSIVE flag and other properties, see Guide on Queue.

At this point, the variable queueName is a randomly generated queueName string. Its value might be: amq.gen-jZTY20BRGko-hjMUjJ0wLG.

Binding (Bindings)

We have created an exchange of type FANout, and now we need to tell the exchange which queue to send the message to. This relationship between exchanges and queues is called a binding.

channel.queueBind(queueName, "logs", "");
Copy the code

The code above will send messages for the exchange named “logs” to our queue.

List Bindings

Guess what tool can be used to list binding relationships?

rabbitmqctl list_bindings
Copy the code

Putting It All Together

The producer program that publishes the log messages is not much different from the code in the previous tutorial. The biggest change is that we now send messages to an exchange named “logs” instead of the default anonymous exchange. When sending a message, a routingKey needs to be provided, but for fanout type exchanges, it ignores the value of the routingKey. Here is the code for sending the logger, emitlog.java:

public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = argv.length < 1 ? "info: Hello World!" : String.join(" ", argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); }}}Copy the code

(Emitlog.java source file)

As you can see, after we establish the connection, we declare the exchange. This step is very necessary

If no queue is already bound to the exchange, the message will be lost, but this does not affect our current application scenario. If there are no current consumers, we can safely discard the message.

ReceiveLogs.java:

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}Copy the code

(Receivelogs. Java source file)

Compile as before.

javac -cp $CP EmitLog.java ReceiveLogs.java
Copy the code

To save logs to a file, open terminal and type:

java -cp $CP ReceiveLogs > logs_from_rabbit.log
Copy the code

To print logs on the screen, open a new terminal and run:

java -cp $CP ReceiveLogs
Copy the code

To emit a log type, enter:

java -cp $CP EmitLog
Copy the code

Use rabbitmqctl list_Bindings to verify that the bindings and queues created by the code are correct. After running the two Receivelogs.java programs, you should see the following output:

sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # = >... done.Copy the code

The explanation for this is also simple: logs exchanged messages are sent to two queues whose names are generated by the server. This is exactly what we expected.

To learn how to listen for a subset of many messages, refer to tutorial 4.


Past dry goods recommended

RabbitMQ tutorial 1. “Hello World”

RabbitMQ 2. Work Queue

Download attachment name total garbled? It’s time you read the RFC documentation!

MySQL priority queue (order by limit)

Freemarker Tutorial (I)- Template development Manual


Code word is not easy, welcome to praise attention and share. Search: [Java class representative], pay attention to the public account. Daily watch, get more Java dry goods in time.