In order to facilitate better communication, please pay attention to the public account:
The Java class representativeEvery day watch waiting for you!

3 Publish/Subscribe

In the previous section, we created a work queue. The goal is to distribute each task to only one worker. In this section, we’ll play it another way: we’ll send out a message and make it available to all consumers. This pattern is called Publish/Subscribe.

To demonstrate this pattern, we will build a logging system. It contains two applications — the first sends log messages, and the second receives and prints log messages.

In our logging system, each running receiver receives the cancellation (the same message is received by each consumer). In this way, we can ask a recipient to save a log to disk; The other prints the log on the screen.

In effect, published log messages will be broadcast to all receivers.

Switching (Exchanges)

In the previous tutorial, we sent and received messages directly through queues. Now it’s time to introduce the complete message model in RabbitMQ.

Here’s a quick review of what we’ve already covered:

  • A producer is an application that sends messages.
  • A queue is a message buffer.
  • A consumer is an application that receives messages.

The core idea of the RabbitMQ message model is that the producer never sends messages directly to the queue. In fact, in most cases, the producer does not even know to which queue the message will be distributed.

Instead, producers can only send messages to exchanges. Exchange is very simple; on the one hand it receives messages from the producer and on the other hand it pushes messages to the queue. The exchange must know exactly what to do with the messages it receives. Should I send a message to a queue? Or to multiple queues? Or throw away the message? The Exchange Type defines the specific rules of behavior.

There are several routing types: direct, topic, headers, and fanout. Let’s look at the last one first, fanout:

channel.exchangeDeclare("logs", "fanout");

Fanout type swaps are very simple. As the name suggests, it broadcasts incoming messages to all queues it knows. This is exactly the way our logging system needs to be.

List all swaps

To list all the exchanges on the server, use the rabbitmqctl command:

sudo rabbitmqctl list_exchanges

Some swaps with names such as amq.* and the default (unnamed) swap will appear in the list. These are created by default and do not need to be used at this time.

No name exchange

In the previous tutorial, we didn’t know about the exchange, but we could still send a message to a queue. This is because we used the default swap, identified by the null character (“”).

Think back to our previous post like:

channel.basicPublish("", "hello", null, message.getBytes());

The first parameter is the name of the exchange, and the null character indicates that the default exchange is used: if the message exists, it is routed to the queue via the specified routingKey.

Now we can send the exchange with the specified name:

channel.basicPublish( "logs", "", null, message.getBytes());

Temporary Queues

You may remember that earlier we used named queues (remember hello and task_queue?). . Naming the queue is important because we need the worker to listen on the queue. When you want to share a queue between producers and consumers, you must name the queue.

But this does not apply to our logging system. We need to listen for all log messages, not just some. And we only care about the current message being sent, not the history message. To do this, we need to do two things:

First, every time we connect to RabbitMQ, we need a completely new queue. We can do this by creating a randomly named queue at a time, or better yet, by having the server create a randomly named queue.

Second, once the queue has no consumer connection, it is automatically deleted.

In the Java client, when we call queueClare (), a non-persistent, dedicated, automatically deleted queue is created:

String queueName = channel.queueDeclare().getQueue();

To learn more about the exclusive flag and other attributes, see Guide on Queue.

At this point, the variable QUEUENAME is a randomly generated string of queue names. Its value might be: amq.gen-jzty20brgko-hjmujj0wlg.

Binding (Bindings)

Now that we have created a FanOut exchange, we need to tell the exchange which queue to send the message to. This relationship between the exchange and the queue is called a binding.

channel.queueBind(queueName, "logs", "");

The code above will send exchanged messages named “logs” to our queue.

Listing Bindings

Can you guess what tool is used to list binding relationships?

rabbitmqctl list_bindings

Code Integration: Putting It All Together

The producer program that publishes the log messages is not much different from the code in the previous tutorial. The biggest change is that we now send messages to exchanges called “logs”, instead of sending them to the default anonymous exchange. When you send a message, you need to provide a routingKey, but for FanOut type exchanges, it ignores the value of the routingKey. Here is the code that sends the logger, emitlog.java:

public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = argv.length < 1 ? "info: Hello World!" : String.join(" ", argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); }}}

(emitlog.java source file)

As you can see, after we establish the connection, the exchange is declared. This step is very necessary

If no queue is already bound to the exchange, the message will be lost, but this does not affect our current application scenario, and if there is no current consumer, we can safely discard the message.

ReceiveLogs.java:

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}

(ReceiveLogs.java source file)

Compile as before.

javac -cp $CP EmitLog.java ReceiveLogs.java

If you want to save the log to a file, open a terminal and type:

java -cp $CP ReceiveLogs > logs_from_rabbit.log

If you want to output logs on the screen, open a new terminal and run:

java -cp $CP ReceiveLogs

To issue the log type, type:

java -cp $CP EmitLog

Use rabbitmqctl list_bindings to verify that the bindings and queues created by your code are correct. After running both receiveLogs.java programs, you should see the following output:

sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # = >... done.

The explanation for this is also simple: messages exchanged by logs are sent to two queues whose names are generated by the server. This is exactly what we expected. To learn how to listen for a subset of many messages, refer to Tutorial 4.


RabbitMQ tutorial 1. “Hello World”

RabbitMQ tutorial 2. Work Queue

Freemarker Tutorial (1)- Template Development Manual

The downloaded attachment name is always garbled? It’s time you read the RFC documentation!

MySQL priority queue (order by limit problem)


Code word is not easy, welcome thumb up share.

Search: 【The Java class representative】, pay attention to the public number, timely access to more Java dry goods.