Application scenarios

Use rabbitMQ to send different messages to users in different companies, temporarily using the direct mode, using different routing-keysCopy the code

Extend the installation

Yum install librabbitmq-y 2, pecl install amqp 3, composer require phP-amqplib /php-amqplibCopy the code

Handling code

RabbitMQService.php <? php namespace common\services\rabbitmq\impl; use common\services\rabbitmq\RabbitMQService; use common\services\rabbitmq\vo\RabbitMQConfigVo; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitMQServiceImpl implements RabbitMQService { /** * @var */ private $rabbitConfig; /** * @var */ public static $connection; /** * @var AMQPChannel */ public $channel; /** * @var string */ public $queue; /** * @var string */ public $type; /** * @var string */ public $exchange; /** * @var string company-{$company_id} */ public $routingKey; public function __construct() { $this->rabbitConfig = \Yii::$app->params['rabbitmq_config']; self::$connection = new AMQPStreamConnection( $this->rabbitConfig['host'], $this->rabbitConfig['port'], $this->rabbitConfig['user'], $this->rabbitConfig['password'] ); $this->channel = self::$connection->channel(); } /** * @param $callBack * @return AMQPChannel * @throws \ErrorException */ public function setConsumerChannel($callBack  = []): AMQPChannel { $this->channel->exchange_declare($this->exchange, $this->type, false, false, false); list($queueName, , ) = $this->channel->queue_declare('', false, false, false); $this->channel->queue_bind($queueName, $this->exchange, $this->routingKey); $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($queueName, '', false, false, false, false, $callBack); While ($this->channel-> is_reply ()) {$this->channel->wait(); } return $this->channel; } /** * @return AMQPChannel */ public function setProducerChannel(): AMQPChannel { $this->channel->exchange_declare($this->exchange, $this->type, false, false, false); Queue_declare ($this-> channel->queue, false, true, false, false); Queue_bind ($this->queue, $this->exchange, $this->routingKey); Return $this->channel; } /** * @param string $type * @return $this */ public function setType(string $type): self { $this->type = $type; return $this; } /** * @param string $exchange * @return $this */ public function setExchange(string $exchange): self { $this->exchange = $exchange; return $this; } /** * @param string $queue * @return $this */ public function setQueue(string $queue): self { $this->queue = $queue; return $this; } public function setRoutingKey(int $companyId): self { $this->routingKey = 'company-' . $companyId; return $this; } /** * @return mixed|void * @throws \Exception */ public function close() { $this->channel->close(); self::$connection->close(); } /** * @param string $data * @param string $routingKey * @param array $options */ public function publish(string $data,  string $routingKey, array $options = []): void { if (empty($options)) { $options = ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]; } $message = new AMQPMessage($data, $options); $this->channel->basic_publish($message, $this->exchange, $routingKey); }}Copy the code
producer.php $rabbitMqService = \Yii::createObject(RabbitMQService::class); $rabbitMqService->setQueue(RabbitMQConstCode::SMS_QUEUE) ->setType(AMQPExchangeType::DIRECT) ->setExchange(RabbitMQConstCode::SMS_EXCHANGE) ->setProducerChannel(); for ($i = 0; $i < 100000; $i++) { $routes = ['dota', 'csgo', 'lol']; $key = array_rand($routes); $arr = [' match_id '= > $I,' status' = > rand (0, 3)]; $rabbitMqService->publish(json_encode($arr, JSON_UNESCAPED_UNICODE), $routes[$key]); Echo 'send'. $routes [$key] 'message:'. $data. PHP_EOL; }Copy the code
consumer.php rabbitMqService = \Yii::createObject(RabbitMQService::class); $queue = function ($MSG ->body); Log::info('send sms info ' . $queue); Echo 'receives the message:' $MSG - > delivery_info [' routing_key], ':', $MSG - > body, PHP_EOL; $this->handleSms(); $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $rabbitMqService->setExchange(RabbitMQConstCode::SMS_EXCHANGE) ->setQueue($queue) ->setType(AMQPExchangeType::DIRECT) ->setRoutingKey($company_id) ->setConsumerChannel($callback);Copy the code

Function Parameter Description

$queue // should be unique in fanout exchange. False // Don't check if a queue with the same name exists [Whether to detect a queue with the same name] durable: False // The queue will not survive restarts for the server EXCLUSIVE: False // The queue might be email exchange with factcheck.org by other channels auto_delete: True // The queue will be deleted once the channel is closed. [Whether the queue is deleted after the channel is closed] name: $exchange type: Direct [Route Type] Passive: false [] durable: true [Whether the switch is durable] Auto_delete: false //the exchange won't be deleted once the channel is closed.Copy the code