Want to get more dry goods, pay attention to the public account: Java class representative, daily one more, waiting for you!

5 Topics


In the previous tutorial, we improved the logging system by using direct substitution to address the mindless broadcast of Fanout Exchange, allowing selective log reception.

Although direct Exchange has improved the system, it has limitations — it cannot route messages based on multiple criteria.

In our logging system, we want to subscribe to logs both by log level and by log source. You may know this concept from syslog Unix tools, which route logs based on log levels and devices.

This gives a great deal of flexibility — we might want to listen for critical error logs from ‘cron’ and all logs from ‘kern’.

To implement this functionality in a logging system, we need to look at topic Exchanges, which are a little more complex.

Topic exchange

Messages sent to a Topic exchange cannot use an arbitrary routing_key — it must be a set of Segmented words. It can be any word, but is usually related to some feature of the message. These routing keys are all valid: “stock.USD. Nyse “,” NYse.vmw “, “quick.orange.rabbit”. There is no limit to the number of words in the routing key, but the maximum number of words cannot exceed 255 bytes.

The format of the Binding key is the same. The logic for topic exchange is similar to that for Direct – messages sent using a specified routing key are distributed to all queues bound to the matching binding key.

  • *(asterisk) represents a particular word

  • # stands for zero or more words

The following example can easily explain this:

In the picture, we will send some messages describing animal characteristics. The message will use a three-word (two-dot split) routing key. The first word describes speed, the second colour and the third species: ”

.
.

“.

Create three bindings: Q1 uses “*.orange.*” and Q2 uses “*.*. Rabbit “and “lazy.#” as the bingding key.

The above binding is summarized as follows:

  • Q1 is interested in all yellow animals

  • Q2 wants to subscribe to all the bunnies and all the lazy animals

The message with the routing key “quick.orange.rabbit” is sent to both queues, and the message with the lazy.orange.elephant is sent to both queues. In addition, “quick.orange.fox” will send to Q1, “lazy.brown.fox” will send to Q2, and “lazy.pink.rabbit” will only send to Q2 once, even if there are two bindings that match Q2, “quick.brown.fox” will not match any bindings, So it gets thrown away.

What if we broke the rule and sent messages using one or four words (routing keys), such as “orange” or “quick.orange.male. Rabbit”? Of course, these messages will not match any binding and will be discarded.

However, for “lazy.orange.male. Rabbit “, even though it has four words, it will still match the last bingding(lazy.#) and send it to Q2

Topic exchange

Topic Exchange is so powerful that it can be made to work just like other exchanges

When a queue is bound by a binding key “#” – it will receive all messages, ignoring the routing key – just like a Fanout exchange

Topic Exchange works like direct Exchange when there are no special characters: “*” and “#” in the binding

Putting It All Together

We will use Topic Exchange in the logging system. Let’s assume that the log routing key consists of two words: “< device >.< level >”

The code is almost the same as in the previous tutorial

EmitLogTopic. Java source code:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    }
  }
  //..
}
Copy the code

ReceiveLogsTopic. Java source code:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
        System.exit(1);
    }

    for (String bindingKey : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}
Copy the code

Compile and run the sample code, remembering to introduce classpath as in Tutorial 1 – Windows uses %CP%.

Compile:

javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
Copy the code

Receiving Log Messages

java -cp $CP ReceiveLogsTopic "#"
Copy the code

Receives all log messages for device “kern”

java -cp $CP ReceiveLogsTopic "kern.*"
Copy the code

Or you only care about critical logs

java -cp $CP ReceiveLogsTopic "*.critical"
Copy the code

Multiple bindings can be created

java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
Copy the code

Routing key “kern.critical” to send log messages:

java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
Copy the code

Have fun with the program. Note that the code does not set the routing key or binding key, you need to specify this parameter when playing.

(EmitLogtopic.java and ReceivelogStopic.java)

In the next article, in Tutorial 6, you’ll learn how to implement Remote Procedure Calls using round-trip messages


[Previous dry goods recommendation]

RabbitMQ tutorial 1. “Hello World”

RabbitMQ 2. Work Queue

RabbitMQ tutorial 3. Publish/Subscribe

RabbitMQ tutorial 4.

Freemarker Tutorial (I)- Template development Manual

Download attachment name total garbled? It’s time you read the RFC documentation!


Code word is not easy, welcome to praise attention and share. Search: [Java class representative], pay attention to the public account. Daily watch, get more Java dry goods in time.