Preface:

This article summarizes some of the features that RabbitMQ uses in daily project development.

one Mandatory parameters

As we saw in the previous article, producers send messages to RabbitMQ’s exchange and route them to specific queues for consumption by matching RoutingKey with BindingKey. So where does the message go when we can’t find the queue through the matching rules? Rabbit gives us two ways. Mandatory Specifies the backup switch.

The mandatory parameter is an argument in the channel.BasicPublish method. Its main function is to return the message to the producer if the destination is unreachable during message delivery. When the mandatory parameter is set to true and the switch cannot find a queue that matches its type and routing key, RabbitMQ calls BasicReturn to return the message to the producer. When the mandatory parameter is set to false. The message is discarded. C# rabbitmq. Client 3.6.9 is used as an example.

 

 
ConnectionFactory Factory = new ConnectionFactory(); ConnectionFactory = new ConnectionFactory(); factory.UserName = "admin"; factory.Password = "admin"; Factory. The HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection(); // Connect to Rabbit IModel channel = conn.createmodel (); // Create channel channel.exchangeDeclare ("exchangeName", "direct", true); String queueName = channel.queueDECLARE ("TestQueue", true, false, false, null).queuename; TestQueue queue, persistent, non-exclusive, not automatically deleted channel.QueueBind(queueName, "exchangeName", "routingKey"); // Queue binding exchange var message = encoding.utf8.getBytes ("TestMsg"); channel.BasicPublish("exchangeName", "routingKey", true, null, message); Var message1 = encoding.utf8.getBytes ("TestMsg1"); // A message that can be routed to the queue has been set to true. channel.BasicPublish("exchangeName", "routingKey1", true, null, message); // The producer callback function channel.basicReturn += (model, EA) => {//do something... This callback function is called if the message cannot be routed to the queue. }; // Close the channel and connection channel.close(); conn.close() ;Copy the code

two Backup switch

When a message cannot be routed to a queue, the parameter Mandatory allows us to return the message to the producer for processing. The problem with this is that the producer needs to open a callback function to handle messages that cannot be routed to, which undoubtedly increases the producer’s processing logic. The backup Exchange (Altemate Exchange) provides another way to handle messages that cannot be routed. The backup switch can store unrouted messages in RabbitMQ and process them as needed. Its main implementation code is as follows:

 
IDictionary<string, object> args = new Dictionary<string, object>(); args.Add("alternate-exchange", "altExchange"); channel.ExchangeDeclare("normalExchange", "direct", true, false, args); // Define the normal exchange and add the backup exchange parameter channel.exchangeDECLARE ("altExchange", "fanout", true, false, null); QueueDeclare("normalQueue", true, false, false, null); // Define a backup switch and declare it as a sector switch channel.QueueDeclare("normalQueue", true, false, false, null); QueueBind("normalQueue", "normalExchange", "NormalRoutingKey1"); QueueDeclare("altQueue", true, false, false, null); // QueueDeclare("altQueue", true, false, false, null); QueueBind("altQueue", "altExchange", ""); Var msg1 = encoding.utf8.getBytes ("TestMsg"); channel.BasicPublish("normalExchange", "NormalRoutingKey1", false, null, msg1); Var msg2 = encoding.utf8.getBytes ("TestMsg1"); // Publish a message that can be routed to the queue. channel.BasicPublish("normalExchange", "NormalRoutingKey2", false, null, msg2); // Publish a message that cannot be routed, and the message ends up in the altQueueCopy the code

 

In fact, the backup switch is not much different from the common switch. For convenience, you are advised to set the backup switch type to FANout, or to direct or topic. Note that messages are resend to the backup exchange with the same routing key as those sent from the producer. If the backup exchange is of type Direct and has a queue bound to it and the bound routing key is KEY1, when a message carrying the routing key is KEY2 is forwarded to the backup exchange, the backup exchange does not match the appropriate queue, and the message is lost. If the message carries a routing key of KEYL, it can be stored in a queue.

There are several special cases for backup switches:

  • If the backup switch does not exist, neither the client nor the RabbitMQ server will fail and messages will be lost.
  • If the backup exchange is not bound to any queues, neither the client nor the RabbitMQ server will fail and messages will be lost.
  • If the backup exchange does not have any matching queues, neither the client nor the RabbitMQ server will fail and messages will be lost.
  • The MANDATORY parameter is invalid if the backup switch is used with the MANDATORY parameter.

3. Expiration Time (TTL)

3.1 Setting the TTL of messages

There are two ways to set the TTL of a message. The first method is through the queue property setting, where all messages in the queue have the same expiration time. The second approach is to set the message itself separately, and the TTL for each message can be different. If both methods are used together, the TTL of the message takes the smaller value in between. Once the TTL of a Message in the queue exceeds the set value, it becomes a “Dead Message” and consumers cannot receive the Message. (For dead-letter queues, see below)

Setting the message TTL via queue attributes is done by adding the X-message-TTL parameter in milliseconds to channel.QueueDeclare. Sample code below:

 
IDictionary<string, object> args = new Dictionary<string, object>(); args.Add("x-message-ttl", 6000); channel.QueueDeclare("ttlQueue", true, false, false, args); Copy the code

If TTL is not set, the message will not expire. If the TTL is set to 0, it means that the message is immediately discarded (or processed by a dead-letter queue) unless it can be delivered directly to the consumer at this point.

Set the TTL for each message by adding the Expiration attribute parameter in milliseconds to the channel.BasicPublish method. The key codes are as follows:

 
BasicProperties properties = new BasicProperties() {Expiration = "20000",// set TTL to 20000ms}; var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("normalExchange", "NormalRoutingKey", true, properties, message);Copy the code

Note: In the first method, the TTL attribute of the queue is set so that once the message expires, it is erased from the queue, whereas in the second method, even if the message expires, it is not immediately erased from the queue, because each message is determined before it is delivered to the consumer. Why? In the first way, expired messages are always at the head of the queue, and RabbitMQ can periodically scan for expired messages from the head. In the second method, the expiration time of each message is different. If you want to delete all expired messages, you have to scan the whole queue. Therefore, it is better to wait until the message is about to be consumed to determine whether it is expired or not.

3.2 Setting the TTL of the queue

Note that this is different from setting the TTL of messages by queue. Above, messages are deleted; here, queues are deleted. The x-Expires parameter in the channel.QueueDeclare method controls how long a queue is unused before it is automatically deleted. This unused means that there are no consumers on the queue, the queue has not been redeclared, and the channel.BasicGet command has not been called during its expiration period.

Setting TTL in queues can be applied to rPC-like reply queues, where many queues are created but not used (see below for RabbitMQ implementing RPC). RabbitMQ will ensure that the queue is deleted when it expires, but not in a timely manner. After RabbitMQ restarts, the expiration time of the persistent queue will be recalculated. The X-Expires parameter is measured in milliseconds and subject to the same constraints as x-message-TTL, except that it cannot be set to 0(an error is reported).

Example code is as follows:

 
IDictionary<string, object> args = new Dictionary<string, object>(); args.Add("x-expires", 6000); channel.QueueDeclare("ttlQueue", false, false, false, args); Copy the code

Four. Dead-letter queue

Dead Letter Exchange (DLX). After a message becomes Dead Letter in one queue, it can be sent to another Exchange, namely DLX. The queue bound to DLX is called Dead Letter queue.

There are several situations in which a message becomes a dead letter:

  • Message is rejected (BasicReject/BasicNack) with requeue parameter set to false; (Consumer confirmation will be covered in the next article)
  • Message expiration;
  • The queue length reaches the maximum. Procedure

The DLX is also a normal switch, no different from a normal switch. It can be specified on any queue, in effect setting the attributes of a queue. When there is a dead letter in the queue, RabbitMQ will automatically re-publish the message to the set DLX and route it to another queue, the dead letter queue. Messages in this queue can be listened to for processing.

Add DLX to this queue by setting the X-dead-letter-exchange parameter in channel.QueueDeclare method. The sample code is as follows:

 
channel.ExchangeDeclare("exchange.dlx", "direct", true); // Define the dead-letter exchange channel.exchangeDeclare ("exchange.normal", "direct", true); IDictionary<String, Object> args = new Dictionary<String, Object>(); args.Add("x-message-ttl",10000); Args. Add("x-dead-letter-exchange", "exchange.dlx"); Add("x-dead-letter-routing-key", "routingkey"); QueueDeclare("queue.normal", true, false, false, args); // Define the binding key of the dead letter exchange. QueueBind("queue.normal", "exchange.normal", "normalKey"); // The normal queue switch binding channel.queueDeclare ("queue.dlx", true, false, false, null); Channel.queuebind ("queue.dlx", "exchange.dlx", "routingKey "); Var message = encoding.utf8.getBytes ("TestMsg"); channel.BasicPublish("exchange.normal", "normalKey", null, message) ;Copy the code

The following is the dead letter queue operation flow:

 

Five. Delays in the queue

RabbitMQ does not provide delay queuing by itself. Deferred queuing is a logical concept that can be simulated by an expiration time + dead-letter queue. The logical architecture of the delay queue is roughly as follows:

 

The producer sends the message to a queue with an expiration time of N. The queue has no consumers to consume the message. When the expiration time reaches, the message is forwarded to the dead letter queue through the dead letter exchange. Consumers consume messages from a dead-letter queue. At this point, the producer has released the message and the consumer has consumed the message after n time has passed, delaying the consumption.

Delay queue can be applied to many scenarios in our project, such as: order cancellation with two messages after placing an order, automatic receipt of goods within seven days, automatic praise within seven days, unfreezing within 24 hours after password freezing, and message compensation mechanism in distributed system (compensation after 1s, compensation after 10s, compensation after 5m……). .

 

Six. Priority queue

As “special” person in our life, our business there are some “special” news, may need to be priority processed, in life we may be on the part of the special people open a VIP channel, the Rabbit also have such a VIP channel (premise is after 3.5 version), namely the priority queue, Messages in the queue have priorities. Messages with higher priorities have the privilege of being consumed first. There are only two things we need to do for these VIP messages:

We only have to do two things:

  1. Declare the queue as a priority queue, that is, add the parameter X-max-priority when creating the queue to specify the maximum priority, the value is 0-255 (integer).
  2. Adds a priority for a priority message.

The sample code is as follows:

 
channel.ExchangeDeclare("exchange.priority", "direct", true); IDictionary<String, Object> args = new Dictionary<String, Object>(); args.Add("x-max-priority", 10); QueueDeclare("queue.priority", true, false, false, args); QueueDeclare("queue.priority", true, false, false, args); Channel.queuebind ("queue.priority", "exchange. Priority ", "priorityKey"); BasicProperties = new BasicProperties() {Priority =8,// Set message Priority to 8}; var message = Encoding.UTF8.GetBytes("TestMsg8"); BasicPublish("exchange. Priority ", "priorityKey", properties, message);Copy the code

Note: Messages that do not specify a priority are treated with a priority of 0. For messages that exceed the maximum priority set by the priority queue, the priority is treated as the maximum priority. For messages of the same priority, the next one takes precedence. It makes little sense to prioritize messages that are sent when consumers are consuming faster than producers and there are no messages piling up in the Broker. Because the consumer consumes a message as soon as the producer sends it, there is at most one message in the Broker, and priority is meaningless for individual messages.

Priority queues seem to violate the first-in, first-out (FIFO) principle of data structures, and how they are implemented will not be discussed here. If you’re interested, do your own research. There will probably be a follow-up article on how this works.

Seven. RPC implementation

Remote Procedure Call (RPC) It is a technique for requesting services from remote computers over a network without the need to understand the underlying network. The main purpose of RPC is to make it easier to build distributed computing, providing the power of remote calls without losing the semantic simplicity of local calls.

We won’t talk much about RPC, but we’ll focus on how RabbitMQ implements RPC. RabbitMQ can implement very simple RPC. The client sends the request message, the server replies the response message, and in order to receive the response message, we need to send a callback queue in the request message (you can use the default queue). Its server-side implementation code is as follows:

 
static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin"; factory.Password = "admin"; Factory. The HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); channel.QueueDeclare("RpcQueue", true, false, false, null); SimpleRpcServer rpc = new MySimpRpcServer(new Subscription(channel, "RpcQueue")); rpc.MainLoop(); } public class MySimpRpcServer: SimpleRpcServer { public MySimpRpcServer(Subscription subscription) : Base (subscription) {} /// <summary> /// summary> public Override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties) { replyProperties = null; Return encoding.utf8.getBytes (" I got that!" ); // </summary> // <param name="evt"></param> public override void ProcessRequest(BasicDeliverEventArgs evt) { // todo..... base.ProcessRequest(evt); }}Copy the code

The client implementation code is as follows:

 
ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin"; factory.Password = "admin"; Factory. The HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); SimpleRpcClient client = new SimpleRpcClient(channel, "RpcQueue"); var message = Encoding.UTF8.GetBytes("TestMsg8"); var result = client.Call(message); //do somethings...Copy the code

This is the Rpc client and server logic encapsulated by Rabbit itself. Of course, we can also implement it ourselves, mainly with the help of two parameters of BasicProperties.

  • ReplyTo: Usually used to set up a callback queue.
  • CorrelationId: Used to correlate a request with its response after invoking RPC.

Its processing process is as follows:

 

  1. When the client starts, an anonymous callback queue is created.
  2. The client sets two properties for the RPC request: ReplyTo informs the RPC server of the destination queue (i.e. the callback queue) to reply the request. Correlationld Marks a request.
  3. Requests are sent to the RpcQueue queue.
  4. The RPC server listens for requests in the RpcQueue, and when the request arrives, the server processes it and sends a message with the result to the client. The received queue is the callback queue set by ReplyTo.
  5. The client listens to the callback queue. When there is a message, it checks the Correlationld property. If it matches the request, it is the result.

conclusion

This article gives a brief overview of some of the features RabbitMQ uses in our projects. These features help us better use Rabbit in our different business scenarios. These features and examples can be run on your own, and verified by looking at the Rabbit web management interface (there is not much to learn about the Web management interface, but a little research will make sense).