The RabbitMQ summary

The characteristics of

First, there are three benefits to message queues: RabbitMQ is an open source AMQP implementation written in Erlang and supports a wide range of clients. It does well in ease of use, scalability and high availability.

  • Reliability (Reliability)
  • Flexible Routing
  • Message Clustering
  • Highly Available Queues
  • Multi-Protocol
  • Multilanguage client (Many Clients)
  • Management UI
  • Tracing Mechanism
  • Plugin System

The conceptual model

The basic concept

  • Message

The body of the message is opaque. The header consists of a series of optional attributes, including routing-key, Priority, and delivery-mode

  • Publisher

Producer, the client application that publishes messages

  • Exchange

A queue that receives messages sent by producers and routes them to the server

  • Binding

Binding for association between message queues and exchanges

  • Queue

The queue that holds the message until it is sent to the consumer, the message container

  • Connection

The network connection

  • Channel

Channel, an independent two-way data flow pipeline in a multiplexing connection

  • Consumer

Consumers of information

  • Virtual Host

Virtual hosts, separate server domains that share the same authentication and encryption environment, are the basis of the AMQP concept and must be specified when connecting, with the default being /

  • Broker

Message queue server entity

Exchange type

Direct fanout Topic headers. Fanout and HEADERS do not process routing keys, and headers performance is much worse

  • direct

The routing key in the message is the same as the bindingKey in the Binding

  • fanout

Messages are distributed to all bound queues and broadcast by the subnet

  • topic

Pattern matching allocates messages, routing keys match patterns, identifying ‘#’ and ‘#’ to match zero or more words, and ” to match one word

  • headers

Sets the property in the message header to match the property information

Virtual host

Vhost can limit the maximum number of connections and queues, and an error will be reported when the limit is reached. You can also set the user resource permissions and Topic permissions under vhost. User resource permissions Refer to the resource operation and use permissions granted to RabbitMQ users when they run AMQP commands on the client. These permissions are divided into three parts: configure, write, and read. See www.rabbitmq.com/access-cont… Topic rights refer to www.rabbitmq.com/access-cont… In general, the MQTT protocol is used only when the MQTT protocol is used

Data is stored

There are two types of messages: persistent messages and nonpersistent messages, both of which are written to disk

  • Persistent messages are written to disk when they arrive in the queue, and a backup is kept in memory. Memory is tight and messages are erased from memory, which improves performance
  • Non-persistent messages are only stored in memory. When memory pressure is high, data is flushed to save memory space

The storage layer is divided into two parts: index file and Store file.

  • Queue index: rabbit_queue_index Indicates information about the drop messages in the maintenance queue, such as where they are stored, whether they have been received by consumers, and whether they have been ack. Indexes are stored in sequential segment files with the suffix.idx, file names are accumulated from 0, each segment file contains fixed segment_entry_count records, the default value is 16384, each index reads messages from disk, at least one segment file is maintained in memory, Use extreme caution when setting queue_index_embed_MSgs_below. The default value is 4K. When messages are less than 4K, they are stored in index files, which can be read into memory and cause memory explosion.

  • Message store: Rabbit_msg_store messages are stored as key-value pairs in a file. All queues on a virtual host use the same store, one per node, which can be persistent (MSG_store_persistent) or transient (MSg_store_TRANSIENT). Store uses file storage, suffix. RDQ, to append files. When the file size exceeds the specified limit (file_size_limit), the file will be closed and a new file will be created. RabbitMQ records information about the location mapping of messages and files in the ETC(Erlang Term Storage) table

    When the message is read, the corresponding file is found according to the message ID. The file exists and is not locked, the file is opened directly, and the message is read from the specified location. The file does not exist or is locked, and the store is sent for processing

    Delete message: Deletes information about a specific message from the ETS table, and updates the corresponding storage file and related information. When deleting a message, the message is not deleted immediately, but only marks junk data. If a file is filled with junk data, the file can be deleted. When it is detected that valid data in two files can be merged into one file and the ratio of the size of all garbage data to the size of all files exceeds the threshold garbage_fration(default value 0.5), garbage collection is triggered to merge the files. The two files performing the merge must be logically adjacent files, merge logic

    • Lock two files
    • Sort out the valid data of the previous files, and then sort out the valid data of the following files
    • Writes valid data from the following file to the preceding file
    • Update the record of messages in the ETS table
    • Delete subsequent files

The queue structure

Rabbit_amqqueue_process is responsible for protocol-related message processing. Rabbit_amqqueue_process is responsible for protocol-related message processing. Backing_queue is the concrete form and engine of the message store, and provides interfaces to rabbit_amqqueue_process to call

Rabbit_amqqueue_process defines four queue states: alpha: both the index and the contents are stored in memory. Beta: The index is stored in memory and the contents are stored in disk. Gama: 4. Delta message index and content are stored on disk

Advanced application

The rabbitMQ as RPC

www.rabbitmq.com/tutorials/t…

Cluster Connection Recovery

Refer to www.rabbitmq.com/api-guide.h… , by setting the factory setAutomaticRecoveryEnabled (true) can set the switch of automatic recovery, has been opened by default. By the factory. SetNetworkRecoveryInterval (10000) how long can be set up to try to restore time, the default 5 s: com.rabbitmq.client.ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL

  • When will automatic recovery be triggered
    • The connected IO loop threw an exception
    • Read Socket timeout
    • The server heartbeat cannot be detected
    • Throw any other exceptions in the connected IO loop
    • If the client fails to connect for the first time, the connection will not be automatically recovered. You need to retry the connection and record the failed attempts to achieve the maximum retry times
    • Connection.close is called in the program and the Connection will not be automatically restored
    • Channel-level exceptions do not automatically restore connections because they are application voice problems, such as consumption in a non-existent queue
  • You can set reconnected listeners on a Connection or Channel. After reconnection or reconnection is successful, listeners will be triggered to add or remove listeners. You need to force a Connection or Channel to be converted into an interface for Recoverable
    ((Recoverable)connection).addRecoveryListener()
    ((Recoverable)connection).removeRecoveryListener()
Copy the code

Message reliability

  • Client code exception capture

After catching exceptions, you can try again, spring by a spring. The rabbitmq. Template. Retry. Enable = true to try again

  • RabbitMQ transaction mechanism
  channel.select();
  chanenl.txCommit();
  channel.txRollback();
Copy the code
  • Sender confirmation mechanism
    spring:
      rabbitmq:
        publisher-confirm-type:correlated
Copy the code
  channel.confirmSelect();
  channel.waitForConfirmsOrDie(5000);
  channel.addConfirmListener();
Copy the code

ConfirmCallback Implement the ConfirmCallback method, which triggers the ConfirmCallback whenever a message is received by the RabbitMQ broker. Ack is true to indicate that the message was sent successfully, ack is false to indicate that the message failed

Return Return mode

spring:
  rabbitmq:
    publisher-returns: true
Copy the code
rabbitTemplate.setMandatory(true)
Copy the code

If a message is not routed to the Queue, the message is discarded (by default). Mandatory returns to the message sender ReturnCallBack(when this function is enabled) to implement the ReturnCallBack interface. If a message fails to be started, the callback interface is triggered. Another option is to set up an Alternate Exchange.

  • Message persistence mechanism
  AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
Copy the code

In addition to messages, exchanges and queues can also be persisted

  • Broker high availability cluster
  • Consumer confirmation mechanism
spring:
 rabbitmq:
   listener:
      simple:
        acknowledge-mode: manual
Copy the code
channel.basicAck(deliveryTag, false);
channel.basicNack(deliveryTag, false.true);
channel.rejectAck(deliveryTag, false);
Copy the code

There are three modes: NONE, AUTO, and MANUAL

spring.rabbitmq.listener.simple.acknowledge-mode=manual
Copy the code
  • Traffic limiting on the consumption end

    • Memory alarms and disk alarms

      The default set_VM_memory_high_watermark value is 0.4. When RabbitMQ uses more than 40% of the available memory, a memory alarm is generated and blocks all producers. Once the alarm is cleared, (the total available memory indicates the size allocated by the OS to each process, for example, 2GB for a 32-bit system, and the threshold is 820MB.) after a memory alarm is generated, all client connections are blocked as blocking and blocked

      • Blocking: there is no connection to send a message
      • Blocked: A connection that attempts to send a message

      Solution: If a memory alarm is generated and available memory is available, run a command to adjust the memory threshold to clear the alarm

        rabbitmqctl set_vm_memory_high_watermark 1
        rabbitmqctl set_vm_memory_high_watermark absolute 1GB
        #This setting will be restored after the RabbitMQ service restarts, permanently modifying the configuration file and restartingVim/etc/rabbitmq/rabbitmq. Conf vm_memory_high_watermark. Relative = 0.4 vm_memory_high_watermark. Absolute = 1 gbCopy the code

      When the available disk space is lower than the disk threshold, the RabbiMQ blocks the producer, preventing the service from crashing due to the disk space exhaustion caused by the continuous page-changing of non-persistent messages. By default, the disk threshold is 50MB. Setting this parameter can reduce but does not eliminate the possibility of disk crashes caused by disk exhaustion. For example, during two disk space checks, the disk space is used up from more than 50MB to 0MB. You can run commands to adjust disk thresholds, which take effect temporarily and restore the disk

        ## disk_limit specifies a fixed size in MB GB 
        rabbitctl set_disk_free_limit <disk_limit>
        ## The relative ratio is between 1.02.0
        rabbitctl set_disk_free_limit mem_relative <fraction>
      Copy the code

      Before the usage of Broker nodes reaches a memory threshold, an attempt is made to free up memory by storing queued messages to disk, an action called paging. Persistent and non-persistent messages is stored in the disk, the persistence of the message itself have a backup in the disk, at this point, the persistent messages will be removed from memory, by default, memory memory threshold of 50% will be in page, also is in the default memory threshold is 0, 4 cases, memory is more than 0.2 page will change, Adjust the memory paging threshold by modifying the configuration file

      Vm_memory_high_watermark_paing_ratio = 0.75Copy the code
    • RabbitMQ provides a credit flow-based flow control mechanism for each connection. Flow control is triggered when a single queue reaches its maximum flow rate, or when multiple queues reach their total flow rate.

    • A Qos mechanism that limits the number of unack messages that can be received on a Channel, beyond which RabbitMQ will not push messages to consumers. This prevents large numbers of messages from being sent from the Broker to consumers instantaneously. This is valid only for push mode, not pull mode. NONE Ack is not supported. Before channel.basicConsume, the quantity can be set before passing channel.basicQos. Message sending asynchronous, message acknowledgement asynchronous, and in the case of slow consumer consumption, prefetchCount is set to indicate that when the broker sends a message to the consumer, the prefechCount message stops without a prefechCount message being sent

  • Message idempotency

    • Database unique index
    • Check mechanism, use optimistic lock, pessimistic lock
    • The message sets a unique ID
Reliability assurance

Reliability can be At most once, At least once, Exactly once RabbitMQ supports a maximum of once and a minimum of once

At least once

  • The message producer starts a transaction or confirm mechanism
  • Mandatory parameter or backup switch
  • Both messages and queues need to be persisted
  • Set autoAck to false to verify that messages have been properly consumed by manual validation

At most one random send

Just once RabbitMQ is currently not guaranteed

  • After consuming a message, the consumer sends an ACK command to RabbitMQ. At this point, the network is disconnected or for some other reason, RabbitMQ does not receive the ack command, and the consumer reconnects and repurchases
  • The producer uses the confirm mechanism, and the network is disconnected after sending. To ensure the reliability of resend, the producer will repeat consumption

Tracking the plugin

Firehose function to achieve message tracking, record every sent or consumed record, debugging, troubleshooting. The idea is to send messages in a specified format to the default switch (AMq.Rabbitmq. trace), a topic switch with publish.{exchangename} and Deliver.{queuename} routing keys.

## open firehose
rabbitmqctl trace_on [-p vhost]
#Close # firehose
rabbitmqctl trace_off [-p vhost]
Copy the code

Firehose is off by default and is non-persistent. Restart rabbitmq_tracing will restore the default state. Rabbitmq_tracing will store message logs to trace files.

rabbitmq-plugins enable rabbitmq_tracing
Copy the code

TTL

Expiration time can be set into queue expiration time and message expiration time. If the queue expiration time is set, the message expiration time is also set, whichever is shorter. When the queue expires, all messages in the queue will be removed. When the message expires, only the message at the top of the queue will determine whether the message is expired (removed).

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl".30000);
arguments.put("x-expires".10000);
channel.queueDeclare("queue1".true.false.false, arguments);
Copy the code
rabbitmqctl set_policy q.ttl ".*" '{"message-ttl":20000, "expires": 10}' --apply-to queues
Copy the code

Dead-letter queue

Three situations in which messages are called dead letters

  • The queue message length reached the limit
  • Consumer rejects consumer information, basicNack/basicReject, and does not put the message back to the original target queue, Requeue =false
  • Message expiration is set for the original queue, and the message expiration time is not consumed

Queue binding dead letter switch: Set the queue parameters: X-dead-letter-exchange and X-dead-letter-routing-key

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl".30000);
arguments.put("x-expires".10000);
arguments.put("x-dead-letter-exchange"."exchange1");
arguments.put("x-dead-letter-routing-key"."key1");
Copy the code

Delays in the queue

Use rabbitmq_delayed_message_exchange, unlike TTL which is in a dead letter queue, x-delayed-message-exchange in a delayed switch

monitoring

There are three main types: Management UI, RabbitMQctl and Rest API, Prometheus + Grafana

Rabbitmqctl Common command

   #Start the service
   rabbitmq-server
   #stop
   rabbitmqctl stop
   #Add and remove check vhost
   rabbitmqctl add_vhost
   rabbitmqctl delete_vhost
   rabbitmqctl list_vhosts
   #Querying switches
   rabbitmqctl list_exchanges
   #Query queue
   rabbitmqctl list_queues
   #Querying consumer Information
   rabbitmqctl list_consumers
   #The user add and check
   rabbitmqctl add_user
   rabbitmqctl delete_user
   rabbitmqctl list_users
Copy the code

Prometheus reference www.rabbitmq.com/prometheus…. http://ip:15672/api/index.html http://ip:15692/metrics

Use cases

Check out gitee.com/ruichunjie/… However, use case scenarios are not complete and will be added later