Introduction to the

RabbitMQ is a highly available messaging middleware and it is essential to learn and use RabbitMQ.

  • Asynchronous messaging
  • Supports various development languages such as Java, Python and PHP
  • Pluggable authentication, authorization
  • Rabbitmq-manager can be used for administration and monitoring.

The installation

Docker is directly used here, which is very convenient for installation

Docker pull RabbitMQ :3.8.3-management-alpine

Run docker run –name run-rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq

Port 15672 is the RabbitMQ Web management page, directly access:http://localhost:15672/, initial user password: guest

use

There are basically two scenarios when RabbitMQ is used as a producer and a consumer

  • One/more producers, more shared consumers
  • One/more producers, multiple independent consumers

Independent consumers do not share queues. Each consumer has its own queue and can define rules to pull data from the Exchange into its own queue

The various scenarios are implemented in code below

Basic concept

queue

Data queues, where data can be pushed to or consumed from a queue

Exchange switches

When data is pushed to the switch, the queue can be bound to the switch. Different types of switches support different binding rules

  • Fanout has no rules, all data in Exchange
  • Direct matches exactly, binding only data for the values specified in the RoutingKey
  • Topic more flexible rules, routingkey routingkey must be a by.Separate words,*The asterisk is used to indicate a word,#(hash sign) is used to indicate any number of words (zero or more)

Encapsulate common operations for RabbitMQ

<? php namespace RabbitMQ; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; Class RabbitMQ {private $host = '127.0.0.1'; private $port = 5672; private $user = 'guest'; private $password = 'guest'; protected $connection; protected $channel; /** * RabbitMQ constructor. */ public function __construct() { $this->connection = new AMQPStreamConnection($this->host,  $this->port, $this->user, $this->password); $this->channel = $this->connection->channel(); } /** * @param $exchangeName * @param $type * @param $pasive * @param $durable * @param $autoDelete */ public function createExchange($exchangeName, $type, $pasive = false, $durable = false, $autoDelete = false) { $this->channel->exchange_declare($exchangeName, $type, $pasive, $durable, $autoDelete); } /** * @param $queueName * @param $pasive * @param $durable * @param $exlusive * @param $autoDelete */ public function createQueue($queueName, $pasive = false, $durable = false, $exlusive = false, $autoDelete = false, $nowait = false, $arguments = []) { $this->channel->queue_declare($queueName, $pasive, $durable, $exlusive, $autoDelete, $nowait, $arguments); } @param $message */ public function sendMessage($message, $routeKey, $exchange = '', $properties = []) { $data = new AMQPMessage( $message, $properties ); $this->channel->basic_publish($data, $exchange, $routeKey); } /** * Consume message * @param $queueName * @param $callback * @throws \ErrorException */ public function consumeMessage($queueName, $callback, $tag = '', $noLocal = false, $noAck = false, $exclusive = false, $noWait = false) { $this->channel->basic_consume($queueName, $tag, $noLocal, $noAck, $exclusive, $noWait, $callback); while ($this->channel->is_consuming()) { $this->channel->wait(); } } /** * @throws \Exception */ public function __destruct() { $this->channel->close(); $this->connection->close(); }}Copy the code

Multiple shared consumers

Multiple consumers can increase consumption speed and provide system throughput

Two, let’s go to the code producer code

<? php require_once '.. /.. /vendor/autoload.php'; use RabbitMQ\RabbitMQ; use PhpAmqpLib\Message\AMQPMessage; $rabbit = new RabbitMQ(); $queueName = 'test-single-queue'; $rabbit->createQueue($queueName,false,true,false,false); for ($i = 0; $i < 10000; $i++) { $rabbit->sendMessage($i . "this is a test message.", $queueName, "",['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); } unset($rabbit); // Close the connectionCopy the code

Running producerphp ProducerYou can see the queue information on the Manager Web page

Consumer code

<? php require_once '.. /.. /vendor/autoload.php'; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $queueName = 'test-single-queue'; $callback = function ($message){ var_dump("Received Message : " . $message->body); //print message sleep(2); $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); //ack }; $rabbit->consumeMessage($queueName,$callback); unset($rabbit); // Close the connectionCopy the code

Run consumer twicephp Consumer.php You can see that two consumers are not consuming messages twice or you can see through the Manager Web that messages from this queue are being consumed

Multiple independent consumers

RabbitMQ producers push messages to exchanges, enabling multiple independent consumers by binding multiple queues to exchanges

Define a topic type switch with the consumption rule: test.ex. Add a word

<? php require_once '.. /.. /vendor/autoload.php'; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $exchangeName = 'test-ex-topic'; $queueName = 'test-consumer-ex-topic'; $routingKey = 'test.ex.*'; $rabbit->createQueue($queueName, false, true); $rabbit->bindQueue($queueName, $exchangeName, $routingKey); Function ($message) {var_dump("Received message: ". $message->body); //print message sleep(2); $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); //ack }; $rabbit->consumeMessage($queueName, $callback); unset($rabbit); // Close the connectionCopy the code

Launch the Consumer PHP consumer.php

Define producers that push messages to two different Routingkeys

<? php require_once '.. /.. /vendor/autoload.php'; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $routingKey1 = 'test.ex.queue1'; $routingKey2 = 'test.ex.queue2'; $exchangeName = 'test-ex-topic'; $rabbit->createExchange($exchangeName, AMQPExchangeType::TOPIC, false, true, false); Routingkey = test-ex-queue1 for ($I = 0; $i < 10000; $i++) { $rabbit->sendMessage($i . "this is a queue1 message.", $routingKey1, $exchangeName, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // Messages persist, restart RabbitMQ, messages will not be lost]); } // For ($I = 0; $i < 10000; $i++) { $rabbit->sendMessage($i . "this is a queue2 message.", $routingKey2, $exchangeName, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // Messages persist, restart RabbitMQ, messages will not be lost]); } unset($rabbit); // Close the connectionCopy the code

Running producerphp Producer.php, you can see that the consumer has 20,000 messages to consume, containing data from two Routingkeys

Delay queue

concept

Rabbitmq queues can be set to a TTL, set expired messages to dead, and then pushed to delay_queue. Delay_queue can be consumed to implement the delay queue function

scenario

Assume a scenario: Xiao Ming places an order on the takeout platform. If the order is not paid within 10 minutes, the system will automatically cancel the order and push the “order cancelled” message to the user.

Development idea: OrderId is pushed to the order queue order_queue when the order is placed, and the validity period of the next message is set to 10 minutes. When this message expires after 10 minutes, the message will be dead letter pushed to exchange. Bind exchange and queue, open one or more consumer consumption queues, and check whether the message order in queue has been paid, if not, push notification, cancel the order.

The flow chart,Failure of message consumption is not considered

The core code

Simple encapsulation of RabbitMQ

<? php namespace RabbitMQ; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; Class RabbitMQ {private $host = '127.0.0.1'; private $port = 5672; private $user = 'guest'; private $password = 'guest'; protected $connection; protected $channel; /** * RabbitMQ constructor. */ public function __construct() { $this->connection = new AMQPStreamConnection($this->host,  $this->port, $this->user, $this->password); $this->channel = $this->connection->channel(); } @param $message */ public function sendMessage($message, $routeKey, $exchange = '', $properties = []) { $data = new AMQPMessage( $message, $properties ); $this->channel->basic_publish($data, $exchange, $routeKey); } /** * Consume message * @param $queueName * @param $callback * @throws \ErrorException */ public function consumeMessage($queueName,$callback) { $this->channel->basic_consume($queueName, '', false, false, false, false, $callback); while ($this->channel->is_consuming()) { $this->channel->wait(); } } /** * @throws \Exception */ public function __destruct() { $this->channel->close(); $this->connection->close(); }}Copy the code

Creating a delay queue

<? php namespace RabbitMQ; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; /** * Class DelayQueue * @package RabbitMQ */ Class DelayQueue extends RabbitMQ {/** * @param $ttl * @param $delayExName * @param $delayQueueName * @param $queueName */ public function createQueue($ttl, $delayExName, $delayQueueName, $queueName) { $args = new AMQPTable([ 'x-dead-letter-exchange' => $delayExName, 'x-message-ttl' => $TTL, 'x-dead-letter-routing-key' => $queueName]); 'x-message-ttl' => $TTL, 'x-dead-letter-routing-key' => $queueName]); $this->channel->queue_declare($queueName, false, true, false, false, false, $args); / / bind dead-letter queue $this - > channel - > exchange_declare ($delayExName, AMQPExchangeType: : DIRECT, false, true, false); $this->channel->queue_declare($delayQueueName, false, true, false, false); $this->channel->queue_bind($delayQueueName, $delayExName, $queueName, false); }}Copy the code

Producer, the code is very simple, look at the effect after running, order message more and more

<? php require_once '.. /vendor/autoload.php'; $delay = new \RabbitMQ\DelayQueue(); $ttl = 1000 * 100; $delayExName = 'delay-order-exchange'; Exchange $delayQueueName = 'delay-order-queue'; Queue $queueName = 'ttl-order-queue'; $delay->createQueue($TTL, $delayExName, $delayQueueName, $queueName); $I = 0; $I = 0; $i < 100; $i++) { $data = [ 'order_id' => $i + 1, 'remark' => 'this is a order test' ]; $delay->sendMessage(json_encode($data), $queueName); sleep(1); }Copy the code

The consumer, looking at the queue after the consumption, will later observe that there is an expiration message pushed to the delay_order_queueConsumers also consume messages!

<? php require_once '.. /vendor/autoload.php'; $delay = new \RabbitMQ\DelayQueue(); $delayQueueName = 'delay-order-queue'; $callback = function ($msg) { echo $msg->body . PHP_EOL; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); // Handle order timeout logic, push notifications to users, etc... sleep(10); }; $delay->consumeMessage($delayQueueName, $callback);Copy the code

code

The code is shown:Github.com/jiaoyang3/r…

🏆 nuggets technical essay | double festival special articles