Writing in the front

Came to jump ship at the end of season, a lot of friend to go out during the interview, the interviewer will be asked about the problem of the message queue, a lot of friend answer is not very perfect, some friend is a heart know the answer, I still not very good, investigate its root causes, or not enough very clear understanding of relevant knowledge. Today, let’s talk about this topic. Note: the article is a bit long. I don’t believe you when you say you can finish it all at once.

The article has been included:

Github.com/sunshinelyz…

Gitee.com/binghe001/t…

What is a message queue?

Message Queue is a container that stores messages during Message transmission and a communication mode between applications. The message can be returned immediately after the message is sent, and the message system ensures the reliable transmission of the message. The message publisher only writes the message to the queue without considering who needs the message, and the users of the message do not need to know who publishes the message, but take it to the message queue, so that the production and consumption can be separated.

Why use message queues?

Advantages:

  • Asynchronous processing: such as SMS notification, terminal status push, App push, and user registration
  • Data synchronization: Synchronizes service data push
  • Retry compensation: Retry after accounting failure
  • System decoupling: communication upstream and downstream, terminal exception monitoring, distributed event center
  • Peak traffic elimination: Processing orders in seckill scenarios
  • Publish and subscribe: HSF service status change notification, distributed event center
  • High concurrency buffer: log service and monitoring reporting

The core uses of message queue comparison are: decoupling, async, and peak shaving.

Disadvantages:

  • Reduced system availability The more external dependencies a system introduces, the more likely it is to fail? How can message queues be highly available?
  • How to ensure that messages are not re-consumed as the system complexity increases? How to handle message loss? How to ensure sequential message delivery?
  • Consistency problem A system processing directly return success, people think you this request is successful; But the problem is, what if BCD three systems, BD two system write library success, the result of C system write library failure? Your numbers don’t match up.

RabbitMQ and Kafka are the two main message queues discussed below.

How can message queues be highly available?

High availability of RabbitMQ

The high availability of RabbitMQ is based on primary/secondary (not distributed) high availability. RabbitMQ has three modes: single-machine mode (Demo level), common cluster mode (no HIGH availability), and mirror cluster mode (high availability).

  • Common Cluster mode

    Normal cluster mode, which means that multiple RabbitMQ instances are started on multiple machines, one on each machine. The queue you create will only be placed on one RabbitMQ instance, but each instance synchronizes the metadata of the queue (metadata can be thought of as configuration information about the queue, which can be used to find the instance of the queue). When you consume, in fact, if you’re connected to another instance, that instance will pull data from the instance where the queue is.

    This way is really troublesome, and it’s not very good, it’s not distributed, it’s just a normal cluster. Because this causes you to either have consumers connect randomly one instance at a time and pull data, or to have fixed connections to instances of that queue that consume data, which has pull overhead, which leads to single-instance performance bottlenecks.

    And if the instance in which the queue is placed goes down, then other instances will not be able to pull from that instance. If you enable message persistence and let RabbitMQ store messages, they will not be lost until the instance is restored.

    So it’s a bit awkward, there’s no such thing as high availability, it’s about throughput, it’s about having multiple nodes in the cluster servicing reads and writes to a queue.

  • Mirroring cluster mode

This is what is called the high availability mode of RabbitMQ. Unlike normal clustering, in mirrored clustering, the queue you create, both metadata and messages, will exist on multiple instances, that is, each RabbitMQ node will have a full mirror of the queue, meaning all the data in the queue. And then every time you write a message to a queue, it automatically synchronizes the message to multiple instances of the queue.

So how to enable this mirror cluster mode? RabbitMQ has a nice admin console that creates a policy in the background. This policy is mirrored cluster mode and can be specified to synchronize data to all nodes or to a specified number of nodes. When creating a queue again, apply this policy. Data is automatically synchronized to other nodes.

In this case, the advantage is that if any of your machines goes down, it doesn’t matter, the other machines (nodes) still contain the complete data of this queue, and other consumers can go to the other nodes to consume data. The downside is that, first of all, the performance overhead is too high. Messages need to be synchronized to all machines, resulting in heavy network bandwidth pressure and consumption. Second, it’s not distributed, it’s not scalable, so if you add machines to a queue that’s heavily loaded, you add machines that also contain all the data in that queue, there’s no way to linearly scale your queue. What would you do if the queue was so large that the machine could no longer accommodate it?

Kafka’s high availability

A basic architectural understanding of Kafka is that it consists of multiple brokers, each of which is a node. You create a topic that can be divided into multiple partitions, each of which can reside on a different broker, and each of which holds a portion of the data.

This is a natural distributed message queue, meaning that the data for a topic is distributed across multiple machines, with each machine hosting a portion of the data.

In fact, something like RabbmitMQ is not a distributed message queue, it’s just a traditional message queue that provides some clustering, High Availability (HA) mechanisms, because no matter how you play it, RabbitMQ a queue of data is stored on a single node, the mirror cluster, and each node in the queue is the complete data.

Prior to Kafka 0.8, there was no HA mechanism. When any broker went down, partitions on that broker became invalid, unwritable and unreadable, and there was no high availability.

For example, let’s say we create a topic and specify three partitions on three machines. However, if the second machine goes down, 1/3 of the data on this topic is lost, so it is not highly available.

After Kafka 0.8, HA mechanism is provided, namely replica replica mechanism. The data of each partition will be synchronized to other machines to form multiple replica copies. All replicas elect a leader, so production and consumption deal with the leader, and the other replicas are followers. On writes, the leader synchronizes data to all followers, and on reads, the leader simply reads the data. Can only read and write to leader? Simply, if you can read and write to each follower at will, then you have to worry about data consistency, because the complexity of the system is so high that problems can easily occur. Kafka evenly distributes all replicas of a partition to different machines to improve fault tolerance.

In this way, there is what is called high availability, because if a broker goes down, it doesn’t matter, because partitions on that broker have copies on other machines. If the broken broker has a partition leader, a new leader will be elected from the followers. This is called high availability.

When the data is written, the producer writes to the leader, who then writes the data to the local disk, and the other followers actively pull the data from the Leader themselves. Once all the followers have synchronized their data, they send an ACK to the leader, who returns a write success message to the producer after receiving an ACK from all the followers. (Of course, this is just one pattern, and you can tweak this behavior)

When consuming a message, it will only be read from the leader, but will only be read by the consumer if a message has been ack successfully synchronized by all followers.

How can messages be guaranteed not to be consumed repeatedly (idempotency)?

First, all message queues have this problem of repeated consumption, because this is not guaranteed by MQ, but by our own development, and we use Kakfa to discuss how to do this.

Kakfa has a concept of offset, where each message is written with an offset value, representing the number of the consumption. After a consumer consumes the data, the default is to submit the offset value of the message once in a while, indicating that I have consumed it. The next time I restart something, Let me continue consuming from the offset currently submitted.

However, there are always accidents. For example, we often encountered in production before, that is, you sometimes restart the system, depending on how you restart, if you encounter something urgent, you directly kill the process, and then restart. This will cause the consumer to have some message processing, but have no time to submit the offset, embarrassing. After the restart, a few messages are consumed again.

In fact, repeated consumption is not terrible, what is terrible is that you have not considered repeated consumption, how to ensure idempotency.

Let me give you an example. Suppose you have a system that consumes one message and inserts one piece of data into the database. If you repeat one message twice, you insert two, and the data is wrong. But if you consume to the second time, their own judgment whether it has been consumed, if directly thrown away, so not retain a data, so as to ensure the correctness of the data. If a piece of data is repeated twice, there is only one piece of data in the database, which ensures the idempotency of the system. Idempotence, in plain English, is just one piece of data, or one request, given to you over and over again, and you have to make sure that the corresponding data doesn’t change, you can’t go wrong.

So the second question is, how can message queue consumption be idempotent?

In fact, we still have to combine business to think, I give a few ideas here:

  • For example, if you want to write data to the library, you first check the primary key, if the data is already available, you do not insert, update ok.
  • If you write Redis, that’s fine, it’s set every time anyway, natural idempotent.
  • For example, if you are not in the above two scenarios, it is a little more complicated. You need to ask the producer to add a globally unique ID, such as order ID, when sending each piece of data. Then when you consume the data here, you can check it in the Redis first according to this ID. If it hasn’t been consumed, you process it, and then that ID is written Redis. If you consume too much, don’t process it. Just make sure you don’t process the same message twice.
  • Such as database based unique key to ensure that duplicate data does not repeatedly insert multiple. Because of the unique key constraint, duplicate inserts only generate errors and do not cause dirty data in the database.

Of course, ensuring that consumption of MQ is idempotent requires a business-specific perspective.

How to ensure the reliable transmission of messages (not lost)?

This is for sure, the basic principle of MQ is that you can’t have one more piece of data, you can’t have one less piece of data, you can’t have one more piece of data. Can not be less, that is, data can not be lost, like billing, buckle some information, is certainly not lost.

The problem of data loss can occur among producers, MQ and consumers, so RabbitMQ and Kafka can be analyzed separately.

How does RabbitMQ ensure that messages are reliable

Producer data loss

When producers send data to RabbitMQ, it can be lost halfway through the process, due to network problems and so on.

To do this, the RabbitMQ transaction channel.txselect is enabled before the producer sends a message. If the message is not received successfully, the producer will receive an exception. At this point, you can roll back the transaction channel.txrollback and retry sending the message; If a message is received, the transaction channel.txCommit can be committed.

// Start the transaction
channel.txSelect
try {
// Here to send messages
} catch (Exception e) {
channel.txRollback

// Here is the message again
}

// Commit the transaction
channel.txCommit
Copy the code

The problem, however, is that with RabbitMQ transactions (synchronization), basically throughput slows down because it takes too much performance.

So in general, if you want to make sure that messages to and from RabbitMQ are not lost, you can enable Confirm mode. After enabling Confirm mode, you will be assigned a unique ID for each message you write to RabbitMQ. RabbitMQ will send you an ACK message saying that the message is ok. If RabbitMQ fails to process the message, it will call back to one of your nACK interfaces to tell you that the message failed and you can try again. In combination with this mechanism, you can maintain the status of each message ID in memory yourself, and if you haven’t received a callback for that message for a certain amount of time, you can resend it.

The big difference between a transaction and a Confirm is that a transaction is synchronous, you commit a transaction and it blocks, whereas a Confirm is asynchronous, you send a message and then you send the next message, RabbitMQ then receives the message and asynchronously calls back to one of your interfaces to inform you that the message was received.

Therefore, the confirmation mechanism is generally used to avoid data loss in the producer.

The RabbitMQ lost data

If RabbitMQ loses data, you must enable RabbitMQ persistence, meaning messages will be persisted to disk after being written to, even if RabbitMQ dies, and will be read automatically after recovery, data will not be lost. Unless very rarely, RabbitMQ dies of its own without persisting, a small amount of data can be lost, but this is unlikely.

There are two steps to setting up persistence:

  • This ensures RabbitMQ will persist the metadata of the queue, but it will not persist the data in the queue.
  • The second is when the message is sentdeliveryModeSet to 2 to persist messages, which RabbitMQ will persist to disk.

Both must be set at the same time, and RabbitMQ will restart its queue from disk even if it hangs and restarts again.

Note that even if you enable persistence for RabbitMQ, it is possible that the message will be written to RabbitMQ before it is persisted to disk, and then RabbitMQ will hang up, causing a small loss of data in memory.

Therefore, persistence can be combined with the producer confirm mechanism to notify the producer ack only after the message has been persisted to disk, so even if RabbitMQ dies before persisting to disk, the data is lost and the producer cannot receive the ACK, you can send it yourself.

Consumer data loss

If RabbitMQ loses data, mainly because the process hangs (for example, after a restart) when you consume it, RabbitMQ will assume that you have consumed it and the data is lost.

This time use RabbitMQackThe mechanism, in a nutshell, means that you must turn RabbitMQ off automaticallyackYou can call it through an API, and then every time you make sure it’s done in your own code, in your programackA hand. That way, if you’re not done with it, you’re notack? RabbitMQ will assume that you have not finished processing the purchase, and will assign the purchase to another consumer so that the message is not lost.

How does Kakfa keep its messages reliable

  • Consumer data loss

    The only way a consumer can lose data is if you consume the message, then the consumer automatically submits the offset, making Kafka think you have consumed the message, but you are just about to process the message, and before you can process it, you hang up and the message is lost.

    This is similar to RabbitMQ. Kafka is known to automatically commit offsets, so simply turn off the automatic commit and manually commit offsets after processing to ensure data is not lost. However, there may still be repeated consumption at this time. For example, if you die before submitting offset after processing, you will definitely repeat consumption once, as long as you ensure idempotency.

    One of the problems in production is that our Kafka consumers consume data and write it to an in-memory queue to buffer it. As a result, sometimes you just write the message to the in-memory queue, and then the consumer automatically submits the offset. Then we restart the system, causing the queue to lose data before it has time to process it.

  • Kafka lost data

    A common scenario in this scenario is that a Kafka broker goes down and the partition leader is reelected. If some data from other followers is not synchronized at this time, the leader dies. If a follower is elected as the leader, some data will be lost. There’s some data missing.

    Kafka’s leader machine crashed. After switching from follower to leader, we found that this data was lost.

    Therefore, you must set at least four parameters as follows:

    • For a topicreplication.factorParameter: This value must be greater than 1, requiring that each partition must have at least 2 copies.
    • Set on the Kafka servermin.insync.replicasParameter: This value must be greater than 1, which requires the leader to be aware that there is at least one follower still in contact with the leader, so as to ensure that there is still one follower after the leader dies.
    • Set on the producer sideacks=all: This is required for each piece of data, must beWrite is considered successful only after all replicas are written.
    • Set on the producer sideretries=MAX(a very, very, very large value, infinite retries) : this isRequires unlimited retries if a write failsIt’s stuck here.

    Our production environment is configured in such a way that, at least on the Kafka broker side, data will not be lost in the event of a leader switch due to a failure of the leader broker.

  • Producer data loss

    If you set acks=all, the write will not be lost. The requirement is that the leader receives the message and all the followers have synchronized the message. If this condition is not met, the producer automatically retries an unlimited number of times.

How do I ensure that messages are sequential?

For example, we used to make a mysql binlog synchronization system. The pressure was very heavy, and the daily data synchronization had to reach hundreds of millions, that is, the data from one mysql database was completely synchronized to another mysql database (mysql -> mysql). A common point is that big data team, for example, needs to synchronize a mysql library to do various complex operations on the data of the company’s business system.

If you add, delete, or change a data item in mysql, you add, delete, or change 3 binlogs. Then you send these 3 binlogs to MQ, and consume them in sequence. Otherwise it would have been: add, modify, delete; You change the order to execute into delete, modify, add, isn’t all wrong?

Originally this data is synchronized, should be the last data deleted; You end up with the wrong order, and the data stays, and the synchronization goes wrong.

Let’s take a look at two scenarios that get out of order:

  • RabbitMQ: One queue, multiple consumers. For example, the producer sends three packets to RabbitMQ, data1/data2/data3, into a memory queue for RabbitMQ. Three consumers each consume one of these three pieces of data from MQ, resulting in consumer 2 completing the operation first, storing datA2 into the database, and then datA1 / datA3. It’s not obviously messed up.

  • Kafka: Let’s say we build a topic with three partitions. The producer can specify a key when writing. For example, if we specify an order ID as the key, the data related to the order must be distributed to the same partition, and the data in the partition must be in order. When consumers retrieve data from a partition, there must also be an order. Up to this point, the order is ok, there’s no confusion. Then, we might have multiple threads in the consumer to process messages concurrently. Because if the consumer is single-threaded consuming processing, and the processing is time-consuming, such as processing a message takes tens of ms, then only dozens of messages can be processed per second, which is too low throughput. If multiple threads are running concurrently, the order may be out of order.

RabbitMQ solution

Splitting multiple queues, one consumer per queue, just a few more queues, that’s a real trouble spot; Or a queue corresponding to a consumer, which is queued internally by an in-memory queue and then distributed to different workers at the bottom for processing.

Kafka solution

  • One topic, one partition, one consumer, internal single-thread consumption, single-thread throughput is too low to use this.
  • Write N memory queues, all data with the same key to the same queue; Then, for N threads, each thread consumes a queue to ensure orderliness.

How do I handle message push-downs?

A large number of messages have been stuck in MQ for hours

One consumer per second is 1000, three consumers per second is 3000, and one minute is 180,000 pieces. So if you have a backlog of millions to tens of millions of data, even if the consumer recovers, it takes about an hour to recover.

At this time, only temporary emergency capacity expansion can be performed. The specific procedure and thinking are as follows:

  • Fix the problem with the consumer and make sure it gets back up to speed, then stop all existing consumers.
  • Create a new topic with 10 times as many partitions and temporarily create 10 times as many queues.
  • Then write a temporary consumer program that distributes data. This program is deployed to consume the backlog of data. After consumption, it does not do time-consuming processing, but directly polls and writes the 10 times as many queues as the temporary ones.
  • Then 10 times as many machines are temporarily enlisted to deploy the consumers, with each batch consuming a temporary queue of data. This is equivalent to temporarily expanding the Queue and consumer resources by 10 times, consuming data at 10 times the normal rate.
  • Once the backlog of data is quickly consumed, the deployed architecture needs to be restored to consume messages with the original consumer machine.

Messages in MQ are out of date

Assuming you are using RabbitMQ, RabbtiMQ can be set to expire, i.e. TTL. If messages are stuck in a queue for more than a certain amount of time they are cleared by RabbitMQ and the data is gone. So that’s the second hole. This does not mean that the data will accumulate in MQ, but rather that the data will simply get lost.

In this case, it’s not about adding to the consumer backlog, because there isn’t any backlog, it’s about losing a lot of messages. We can adopt a solution, that is, batch reguide, which we have done similar scenes online before. You know, when there’s a huge backlog, we just throw it away, and then after the peak period, like when people are drinking coffee and staying up until 12 o ‘clock at night, people are asleep. At this time we began to write procedures, will lose that batch of data, write a temporary program, bit by bit to find out, and then re-into mq inside, lost data to him during the day to make up for it. That’s all it has to be.

Suppose 10,000 orders are unprocessed in MQ, and 1000 of them are lost. You have to manually write a program to find those 1000 orders and manually send them to MQ to be filled again.

Mq is almost full

What if messages are backlogged in MQ and you do not clear them for a long time, causing MQ to fill up? Is there another way to do this? No, your first plan was too slow. You wrote an AD hoc program to access data to consume, consume one by one, discard one by one, quickly consume all messages. Then go to plan two and make up the data later in the evening.

References:

  • Kafa deep parsing
  • RabbitMQ source code parsing

If you have any questions, you can leave a comment below or add me to wechat: SUN_shine_LYz. I will pull you into the group. We can exchange technology together, advance together, and make great force together