Seven modes are introduced and applied scenarios

Simple mode (Hello World)

Doing the simplest thing, one producer for each consumer, RabbitMQ acts as A message broker that forwards A’s messages to B

Application scenario: An outgoing E-mail message is placed in a message queue, where the mail service fetches the message and sends it to the recipient

Work Queues

The distribution of tasks among multiple consumers (competitive consumer model), one producer for multiple consumers, is generally suitable for the implementation of resource-intensive tasks, a single consumer can not handle, require multiple consumers to process

Application scenario: The processing of an order takes 10s. Multiple orders can be placed in the message queue at the same time, and then multiple consumers can be processed at the same time. In this case, there is parallelism, rather than the serial case of a single consumer

Publish/Subscribe mode

By sending a message to many consumers at once, a message sent by one producer will be fetched by many consumers, that is, the message will be broadcast to all consumers.

Application scenario: Multiple caches and multiple databases need to be notified after updating the commodity inventory. The structure here should be:

  • A FanOut type switch fans out two message queues, namely cache message queue and database message queue
  • A cache message queue corresponds to multiple cache consumers
  • A database message queue corresponds to multiple database consumers
Routing Mode

A message is selectively received and sent to a switch with a Routing key specified. The consumer needs to specify the Routing key when binding the queue to the switch. Only the messages with the specified Routing key are consumed

Application scenario: For example, if an iPhone 12 is added to the inventory, the iPhone 12 promotion customer will specify an iPhone 12 routing key. Only this promotion will receive messages. Other campaigns will not care about and will not consume messages with this routing key

Topics

If you receive a message based on the topic and match the routing key to a pattern, then the queue needs to be bound to a pattern, # matches one word or more words, * matches only one word.

Application Scenarios: As above, iPhone promotions can receive messages with an iPhone theme, such as iPhone 12, iPhone 13, etc

Remote Procedure Call (RPC)

RPC can be used if we need to run functions on a remote computer and wait for the results, as shown in the diagram. Application scenario: You need to wait for the interface to return data, such as order payment

Publisher Confirms

Confirm publication verification with the publisher that the publisher is a RabbitMQ extension that enables reliable publication. When publisher acknowledgement is enabled on the channel, RabbitMQ acknowledges messages published by the sender asynchronously, which means that they have been processed on the server side. (Search the public number worker brother technology road, see more Java back-end technology stack and interview questions selection)

Application scenarios: higher requirements for message reliability, such as wallet deduction

Code demo

There is no demonstration of the latter two patterns in the code, you can study for yourself if you are interested

A simple model
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Sender { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // durable: durable (); // durable (); // durable (); // durable (); // durable (); // durable (); QueueClare (QUEUE_NAME, false, false, false, null); // arguments: other properties Chunile.queueClare (QUEUE_NAME, false, false, false, null); CSS = "Simplifying Mode message"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[x]Sent '" + message + "'"); // Finally close the clearance and connect to Channel.close (); connection.close(); }}
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver { private final static String QUEUE_NAME = "simplest_queue"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {// Get ConnectionFactory Factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}
Work queue pattern
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver1 { private final static String QUEUE_NAME = "queue_work"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // The server sends only one message at a time to the consumer Channel.basicQos (1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver2 { private final static String QUEUE_NAME = "queue_work"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // The server sends only one message at a time to the consumer Channel.basicQos (1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Sender { private final static String QUEUE_NAME = "queue_work"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); Channel. queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { String message = "work mode message" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[x] Sent '" + message + "'"); Thread.sleep(i * 10); } channel.close(); connection.close(); }}
Publish and subscribe model
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Receive1 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback = (consumerTag, consumerTag, consumerTag) delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; Chunile. basicConsume(queueName, true, DeliverCallback, consumerTag -> {}); }}
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Receive2 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback = (consumerTag, consumerTag, consumerTag) delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received2 '" + message + "'"); }; Chunile. basicConsume(queueName, true, DeliverCallback, consumerTag -> {}); }}
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Sender { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "publish subscribe message"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }}
Routing patterns
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver1 { private final static String QUEUE_NAME = "queue_routing"; private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); QueueBind (QUEUE_NAME, EXCHANGE_NAME, "key"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver2 { private final static String QUEUE_NAME = "queue_routing2"; private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // Receive key2 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Sender { private final static String EXCHANGE_NAME = "exchange_direct"; private final static String EXCHANGE_TYPE = "direct"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // declare Channel. exchangeClare (EXCHANGE_NAME, EXCHANGE_TYPE); // Only those with the same routingKey will consume String message = "routing mode message"; channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes()); System.out.println("[x] Sent '" + message + "'"); // channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes()); // System.out.println("[x] Sent '" + message + "'"); channel.close(); connection.close(); }}
The theme mode
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver1 { private final static String QUEUE_NAME = "queue_topic"; private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); QueueBind (QUEUE_NAME, EXCHANGE_NAME, "key.*"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver2 { private final static String QUEUE_NAME = "queue_topic2"; private final static String EXCHANGE_NAME = "exchange_topic"; private final static String EXCHANGE_TYPE = "topic"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); QueueBind (QUEUE_NAME, EXCHANGE_NAME, "*.*"); // If you accept a key.1 channel, QueueBind (QUEUE_NAME, EXCHANGE_NAME, "*. QueueBind (QUEUE_NAME, EXCHANGE_NAME, "*.#"); // You can accept Key.1.2 Chunk.QueueBind (QUEUE_NAME, EXCHANGE_NAME, "*.#"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Sender { private final static String EXCHANGE_NAME = "exchange_topic"; private final static String EXCHANGE_TYPE = "topic"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); String message = "topics model message with key.1"; channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes()); System.out.println("[x] Sent '" + message + "'"); String message2 = "Topics Model Message with Key.1.2 "; Channel. BasicPublish (EXCHANGE_NAME, "is the key. 1.2", null, message2. GetBytes ()); System.out.println("[x] Sent '" + message2 + "'"); channel.close(); connection.close(); }}
Four types of switches are introduced
  • Direct Exchange: Routing switches that bind to this exchange specifying a routing_key. Switches that send messages need a routing_key and will send messages to their respective queues
  • Fanout Exchange: Broadcast messages to all queues without any processing. Fastest
  • Topic Exchange: Add pattern matching to the direct-connected exchange, i.e., pattern matching to routing_key with * for one word and # for multiple words
  • Headers Exchange: Ignores routing_key and uses the Headers information (a Hash data structure) for matching. The advantage is that there are more and more flexible matching rules

conclusion

Each of these queue patterns has its own application scenarios, which you can choose from in the application scenarios example

Author: I think I’m blog.csdn.net/qq_32828253/article/details/110450249