One, foreword

Message queue middleware (message middleware for short) refers to the use of efficient and reliable messaging mechanism to carry out platform-independent data communication, and based on data communication to carry out distributed system integration.

By providing message passing and message queuing model, it can provide application decoupling, elastic scaling, redundant storage, traffic peak clipping, asynchronous communication, data synchronization and other functions in distributed environment. As an important component in distributed system architecture, it has a pivotal position.

At present, the open source messaging-oriented middleware includes ActiveMQ, RabbitMQ, Kafka, RocketMQ, Pulsar, etc. Many factors must be considered when introducing messaging-oriented middleware into the overall architecture, such as cost and income. How to achieve the optimal cost performance? Although there are many kinds of message-oriented middleware, each of them has its own focus. It is undoubtedly the best way to choose the right one and to maximize the strengths and avoid the weaknesses.

Second, the introduction

The following is a brief comparison of the mainstream messaging middleware (this article focuses on Kafka and RabbitMQ in detail)

Message-oriented middleware model

  1. Point-to-point model

    The point-to-point model is used for point-to-point communication between message producers and message consumers. Message producers send messages to specific consumers identified by a name. This name actually refers to a Queue in the consuming service where the message is stored before it is delivered to the consumer. Queued messages can be stored in memory or persisted to ensure that messages can still be delivered in the event of a message service failure.

    Each message has only one consumer and cannot be re-consumed (once consumed, the message is no longer in the message queue)

  2. Publish/subscribe model

    Similar to the wechat public account, after the user follows it, the public account system (publisher) will release the message, and the user’s wechat will receive the pushed message. Here, we also need to distinguish between pull and push.

    • Pull: The initiative lies in the consumer, the advantage is on-demand consumption (eat buffet, take as much as you can), and the server queue accumulation of messages is relatively simple (do not record the status ah, the status is in the consumer); The downside is message latency (not knowing when to pull updates); Of course, it can also be combined with push to improve the real-time performance of messages according to the actual situation

    • Push: The initiative lies in the service side, which has the advantage of high real-time performance. The service side can carry out load through unified management, but it is easy to cause slow consumption. The disadvantage is that the state of the sent message is centrally managed, which is stressful (to distribute messages and record the state and make backups).

The basic concept

kafka

Broker

A Broker is a Kafka server that relies on a Zookeeper cluster to coordinate services if multiple brokers form a cluster.

The producer sends a message to the Kafka server. The consumer reads the message from the Kafka server.

Topic and Partition

Topic represents a class of messages, which can also be thought of as a place to which messages are sent. Usually we can use topics to distinguish between actual businesses, such as business A using one topic and business B using another.

Topics in Kafka are usually subscribed by multiple consumers, so for performance reasons, Kafka does not have a topic-partition-message two-tier structure, but a topic-partition-message three-tier structure to spread the load. In essence, each Kafka topic consists of several partitions

A topic is composed of multiple partitions, whereas Kafka’s partitions are an unmodifiable ordered sequence of messages, or an ordered message log. Each partition has its own partition number, usually starting from 0. The only thing a user can do with a partition is append a message to the end of the message sequence. Each message on a partition is assigned a unique sequence number

The sequence number is called offset and is an integer increasing from 0. Displacement information can be uniquely located to a message under a partition.

Why did Kafka design partitions?

Address scalability issues. What if a broker has accumulated more data than a single broker machine can accommodate? A natural thought is, can you split the data into multiple pieces and store them on different brokers? So Kafka designed partitions.

Consumer groups

A Consumer Group is a Group with multiple consumers or instances of consumers that share a common ID, called a Group ID. All consumers within the group coordinate to consume all partitions of the subscribed topic. Each partition can only be consumed by one Consumer instance within the same Consumer group.

The Consumer Group has three features.

  1. A Consumer Group can have one or more Consumer instances.
  2. The Group ID is a string that identifies a unique Consumer Group in a Kafka cluster.
  3. A single partition of topics to which all instances of a Consumer Group subscribe can be allocated for consumption only by one Consumer instance within the Group. This partition can also be consumed by other groups.

Remember the two message-oriented middleware models mentioned above

Kafka uses only one mechanism, the Consumer Group, but implements both models of traditional messaging engine systems: point-to-point if all instances belong to the same Group; If all instances belong to different groups, it implements a publish/subscribe model.

In a real-world usage scenario, how many Consumer instances should there be under a Group?

Ideally, the number of Consumer instances should equal the total number of partitions that the Group subscribes to.

For A simple example, if A Consumer Group subscribes to three topics, A, B, and C, with partitions 1, 2, and 3, it is generally ideal to set 1+2+3=6 Consumer instances for the Group. Because it maximizes high scalability.

Consumption order problem

Following the above design, it may lead to consumption order problems, which are described below

Out of order scenario 1

Because a topic can have multiple partitions, Kafka only ensures that partitions are internally ordered

When the number of partitions is equal to the number of consumers in the same consumer group, sequential data may be distributed to different partitions, resulting in out-of-order processing

The solution

  1. Topic can be set to have only one partition

  2. Based on service requirements, specify the same partition in sequence

Out-of-order scenario 2

For the same business into the same consumer group, using multithreading to process the message, will lead to the message out of order

The solution

Consumers create internal memory queues of the same amount based on the number of threads. For a series of business data that needs to be ordered, they put them into the same memory queue based on the key or business data, and then the threads take out and operate from the corresponding memory queue

Rebalance

Rebalance is essentially a protocol that dictates how all consumers in a Consumer Group agree to allocate each partition of a subscription Topic. For example, a Group with 20 Consumer instances subscribes to a Topic with 100 partitions. Normally, Kafka allocates an average of five partitions per Consumer. This allocation process is called Rebalance.

When does the Consumer Group make Rebalance? There are three conditions that trigger Rebalance.

  1. ** The number of group members has changed. ** For example, a new Consumer instance joins or leaves the group, or a Consumer instance crashes and is “kicked” out of the group.
  2. ** The number of subscribed topics has changed. **Consumer groups can subscribe to topics using regular expressions, for exampleRunning the consumer. The subscribe (Pattern.com (" t. * c))The Group subscribes to all topics starting with t and ending with C. When you create a new topic that meets this condition during a Consumer Group run, this Rebalance occurs to the Group.
  3. ** The number of partitions subscribed to the topic changed. **Kafka currently allows you to increase the number of partitions for only one topic. When the number of partitions increases, all groups that subscribe to the topic Rebalance.

Rebalance has a huge impact on Consumer groups. Will stop the world, or STW. We know that during STW, all application threads stop working and the entire application freezes. The Rebalance process is similar. During the Rebalance, all Consumer instances stop consuming and wait for the Rebalance to complete. This is one of the worst aspects of Rebalance.

Storage principle

Broker storage model analysis

How can a Topic be stored among multiple brokers? From the figure above, we can see that Topic 1 is divided into three partitions, namely P1, P2 and P3.

In addition, two copies are set for each Partition.

In the figure above, P1 with red background is the leader partition of Partition1, and P1 with blue background is the follower partition. The consumer reads the data only through the leader node, thus avoiding the data consistency issues associated with master-slave replication.

Replica analysis

Since partition is an ordered message log, we must not save only this log. Otherwise, if the Kafka server that saved partition is down, all the messages saved on it will be lost. A distributed system is bound to achieve high reliability, and the main way to achieve this is by backing up multiple logs through redundancy mechanism. These backup logs are called replicas in Kafka, and their sole purpose is to prevent data loss

  • Copy distribution:

  1. First, replica factors cannot be greater than the number of brokers.

  2. The first copy placement of the first partition (the partition numbered 0) is randomly selected from the collection of brokers;

  3. The first copy of the other partitions is moved backward relative to the 0th partition; For example, if we have three brokers and four partitions, assuming that the first copy of the first partition is placed on the first Broker, the first copy of the second partition will be placed on the second Broker. The first copy of the third partition will be placed on the third Broker; The first copy of the fourth partition will be placed on the first Broker, and so on;

  4. The remaining copy of each partition relative to the first copy is actually determined by nextReplicaShift, and this number is also randomly generated.

  • Classification of copies:

    Leader replica and follower replica.

    The follower replica does not provide services to the client, that is, it does not respond to message writing or message consumption requests from the client. It passively retrieves data from the Leader Replica. If the broker that the Leader replica is in goes down, Kafka elects a new leader from the remaining replicas.

    Leader and Follower

    As mentioned above, the replica in Kafka is divided into two roles: leader and follower. Kafka ensures that multiple replicas of the same partition are not distributed on the same broker. After all, if there are multiple replicas of the same partition on the same broker, the effect of backup and redundancy will not be realized.

  • Copy election:

    All brokers at startup attempt to create temporary nodes/controllers in Zookeeper, only one of which succeeds (first come, first served). If the Controller hangs or there is a problem with the network, the temporary node on Zookeeper disappears. Other brokers listen to the offline Controller via watch and campaign for a new Controller. The method is the same as before. Whoever writes a Controller node to Zookeeper first becomes the new Controller.

    What does Controller do?

    With the Controller’s Broker node, you can elect the leader of the partitioned copy. Here are a few concepts to know

    • Assign-replicas (AR) : Specifies all Replicas of a partition.
    • In-sync Replicas(ISR) : Is the replica of all the Replicas that maintains some degree of synchronization with the leader data.
    • Out-of-sync-replicas (OSR) : A replica that is too late to be synchronized with the leader.

    AR = ISR + OSR. Under normal circumstances OSR is empty, everyone is normal synchronization, AR=ISR. If the synchronization delay exceeds 30 seconds, ISR is kicked out and OSR is entered. If you catch up, join the ISR.

    How are leader replicas elected?

    Here the election is hosted by Controller, using Microsoft’s PacificA algorithm. In this algorithm, by default, the first replica in the ISR becomes the leader. For example, ISR is 1, 5, and 9.

  • Copy synchronization mechanism:

    After the leader copy is successfully elected, data needs to be synchronized to the backup copy. How do followers want the leader to synchronize data?

    First I need to learn a few concepts, as shown below

  • Hight Watermark (HW) : specifies the Offset of the last Committed message in the partition.
  • LEO: Log End Offset, the Offset of the latest message in the Leader.
  • Committed Message: messages that have been Committed and synchronized by all ISRS.
  • Lagging messages: Messages that do not achieve all ISR synchronization.

What does data synchronization look like?

  1. The follower node sends a fetch request to the leader. After the leader sends data to the follower, the Follower’s LEO needs to be updated.
  2. After receiving the data response, the followers write messages and update the LEO in turn.
  3. The leader updates the HW (LEO with the smallest ISR).

Note that consumers can only consume data in HW. Kafka designs unique ISR replication to provide high throughput while maintaining data consistency.

How to troubleshoot the fault?

  • Followers of failure

    If the follower fails, the following steps are performed

    1. Follower is kicked out of the ISR.
    2. After the follower recovers, it deletes the data whose HW value is higher than the previously recorded HW value.
    3. Then, the leader data is synchronized until it catches up with the leader and rejoins the ISR.
  • Leader failure

    If the leader fails, perform the following steps

    1. The leader is kicked out of the ISR, and the Controller elects a new leader.
    2. Other followers delete messages higher than HW and then synchronize the leader’s data.

    ** Note: ** This mechanism only guarantees data consistency, not data loss and duplication.

Segment analysis

A partition is divided into multiple segments to store data in order to prevent the file from becoming too large due to the continuous addition of logs, resulting in low message retrieval efficiency. (MySQL also has the logical concept of segment.

  1. Segment File structure analysis

Each partition contains a set of files as shown in the following figure:

  • .logA file is a file that stores messages.
  • .indexIs the offset index file.
  • .timeindexIs a timestamp index file.

The. Log file splits logs to generate a new set of segment files when certain conditions are met during the process of continuously adding data to messages. The conditions for shard logs are as follows:

In the first case, you can control the size of the. Log file based on the following parameters. The default value is 1073741824 byte (1G).

log.segment.bytes=1073741824
Copy the code

The second case is based on the maximum timestamp of the message and the difference between the current system timestamp. It can be controlled by the following parameters: The default value is 168 hours (one week)

log.roll.hours=168
It can be controlled in milliseconds
log.roll.ms
Copy the code

The third case is that the offset index file or timestamp index file reaches a certain size. The default value is 10485760 bytes (10M). If you want to reduce the shard of log files, you can increase this value a bit.

log.index.size.max.bytes=10485760
Copy the code
  1. Offset Indicates the index file

    The offset index file records the mapping between offset and the physical address (location in the log file) of the message. The content is a secondary file. You can run the following command to view the information

    ./kafka-dump-log.sh --files /data/kafka-logs/cluster-test4java-2/00000000000000000000.index | head -n 10
    Copy the code

Note that Kafka does not index every message, but oneSparse index. The sparse index structure is shown in the figure below

How to determine the sparsity of a sparse index?

The sparsity of the offset index is determined by the size of the interval message, which defaults to 4KB and can be controlled by the following parameters

log.index.interval.bytes=4096
Copy the code

As long as the written message exceeds 4K, one record is added to the offset index.

  1. Timestamp index file

    There are two types of timestamp indexes: the timestamp of message creation and the time consumed for appending writes to the Broker. When exactly? Controlled by a single parameter:

    CreateTime (CreateTime); LogAppendTime (LogAppendTime)
    log.message.timestamp.type=CreateTime or LogAppendTime
    Copy the code

    To view the earliest 10 timestamp indexes, run the following command:

    ./kafka-dump-log.sh --files /data/kafka-logs/cluster-test4java-2/00000000000000000000.timeindex | head -n 10
    Copy the code

    The result is as follows, where the mapping between time and offset is recorded

How do I quickly retrieve messages?

Let’s say I want to retrieve a message with an offset of 10002673.

  1. When consuming, you can determine the partition, so the first step is to find which segment to be in. The Segment file is named with base offset, so it can be quickly determined using dichotomy (find segments with names not less than 10002673).
  2. This segment has corresponding index files that appear in sets. So now we look for position in the index file based on offset.
  3. After obtaining position, search for offset in the corresponding log file and compare with the offset of the message until the message is found

Conclusion:

What makes Kafka so fast?

  1. Sequential disk write (data)
  2. Zero copy (read data)
  3. File index (segment.index,.timeindex)
  4. Messages are read and written in batches and compressed to reduce network I/O loss.

RabbitMQ

ConnectionFactory, Connection, Channel

ConnectionFactory, Connection, Channel

ConnectionFactory, Connection, and Channel are all basic objects in RabbitMQ’s API.

  • ConnectionFactory: Manufacturing factory of the Connection.
  • Connection: socket Connection to RabbitMQ, which encapsulates some of the socket protocol logic.
  • Channel: A Channel is a virtual connection built over a “real” TCP connection. There is no limit to how many channels can be created over a TCP connection. Think of it as an optical fiber. It is the most important interface we use with RabbitMQ, and most of our business operations are done in Channel, including defining queues, defining exchanges, binding queues to exchanges, and publishing messages.

Queue

A Queue is the internal object of RabbitMQ to store messages.

Messages in RabbitMQ can only be stored in queues. The producer (P in the figure below) produces the message and eventually delivers it to the Queue, and the consumer (C in the figure below) can fetch the message from the Queue and consume it. The consumer can be one or more.

Message Acknowledgment (Ack Message acknowledgment) :

In practice, messages could be lost if consumers receive messages from the Queue but fail (or otherwise fail) without processing them. To avoid this, we can ask the consumer to send a receipt to RabbitMQ after consuming the message. RabbitMQ will not remove the message from the Queue until it receives an ACK. If RabbitMQ does not receive a receipt and detects that the consumer’s RabbitMQ connection is down, RabbitMQ will send the message to other consumers (if there are more than one) for processing. There is no timeout concept and no amount of time a consumer takes to process a message will cause it to be sent to another consumer unless its RabbitMQ connection is disconnected.

Another problem here is that if our developers forget to send the receipt to RabbitMQ after processing the business logic, this will cause serious bugs – messages will pile up in the Queue; After the consumer restarts, the messages are repeatedly consumed and the business logic is repeatedly executed

In addition, pub message does not have ACK.

Message Durability (Message Persistence)

If we want to avoid messages being lost even when the RabbitMQ service is restarted, we can make both queues and messages durable to guarantee that RabbitMQ messages will not be lost in most cases. This does not resolve the occurrence of a loss event (such as a RabbitMQ server power failure after the RabbitMQ server has received a producer’s message before it can persist it), and if we need to manage this event we need to use transactions.

A little bit about transactions

RabbitMQ provides us with two ways:

  1. Through the AMQP transaction mechanism, which is also the solution provided by the AMQP protocol level;
  2. This is achieved by setting channel to confirm mode.
  • AMQP transaction mechanism implementation

    There are three methods associated with transactions in RabbitMQ: TxSelect (), txCommit(), and txRollback(). TxSelect is used to set the current channel to transaction mode, txCommit is used to commit the transaction, and txRollback is used to roll back the transaction. After starting the transaction with txSelect, we can publish a message to the broker proxy server. If txCommit is successful, the message must reach the broker. If the broker crashes or throws an exception for some other reason before txCommit is executed, At this point we can catch the exception and roll back the transaction with txRollback.

    Implementation steps:

    1. The client sends the Tx. Select
    2. Broker sends tx.select-ok (publish after)
    3. The client sends MIT at Tx.Com
    4. The broker sent Tx.Com MIT – Ok

    Transactions do solve the problem of confirming messages between producer and broker. If a message is successfully accepted by the broker, the transaction can commit successfully. Otherwise, we can catch the exception, roll back the transaction, and resend the message at the same time. The second approach can effectively reduce the performance penalty.

  • Confirm mode implementation

    The problem with RabbitMQ is that the producer does not know if the message has actually reached the broker. The AMQP protocol provides a transaction mechanism to solve this problem, but the transaction mechanism reduces RabbitMQ’s message throughput. So is there a more efficient solution? The answer is Confirm mode.

    The greatest benefit of confirm mode is that it is asynchronous. Once a message is published, the producer application can continue to send the next message while waiting for the channel to return an acknowledgement. When the message is finally acknowledged, the producer application can process the acknowledgement via a callback method. If RabbitMQ loses a message due to its own internal error, it will send a NACK message, which the producer application can also process in the callback method.

    After a channel is set to confirm mode, all subsequent published messages are confirmed (ack) or nack once. There is no guarantee that a message will be confirmed fast, and the same message will not be both confirmed and nack.

Prefetch Count (total number of messages sent to consumers each time)

Earlier we said that if multiple consumers subscribe to messages in the same Queue, the messages in the Queue will be spread across multiple consumers. In this case, if the processing time of each message is different, it is possible that some consumers will be busy all the time, while others will quickly finish their work and remain idle. We can set prefetchCount to limit the number of messages a Queue can send to each consumer. For example, if we set prefetchCount=1, the Queue sends one message to each consumer. After the consumer processes the message, the Queue sends another message to the consumer.

Exchange

Exchange producers send messages to Exchange (the Exchange, X in the figure below), which routes the messages to (or drops them) in one or more queues according to certain rules.

Routing key

When sending a message to an Exchange, producers specify a routing key that specifies the routing rules for the message. Exchange determines which Queue to route messages to based on the routing key and Exchange Type (Exchange Type) and Binding key matches. The RabbitMQ routing key length is limited to 255 bytes.

Binding

RabbitMQ uses a Binding to associate exchanges with queues.

Binding key

When an Exchange and Queue are bound, a Binding key is specified.

Exchange Types

The common Exchange types for RabbitMQ are Fanout, Direct, Topic, and Headers.

  • An Exchange Routing rule of the Fanout type is very simple. It routes all messages destined for the Exchange to all queues bound to the Exchange, with no Routing key.

  • Exchange routing rules of the Direct type are also simple, routing messages to queues whose binding keys exactly match routing keys.

In this setup, we can see that two queues, Q1 and Q2, are directly bound to switch X. The first queue is bound with the binding key orange, and the second queue has two bindings, one with the binding key black and the other green.

In this setup, messages published to the exchange through the routing key orange are routed to queue Q1. Messages with black or green routing keys will enter Q2. All other messages will be discarded.

In the above example, after the routingKey= “error” message is sent to Exchange, Exchange routes the message to Queue1 (amqp.gen -s9b… This is the Queue name automatically generated by RabbitMQ) and Queue2 (amqp.gen-Agl…). ; If a routingKey= “info” or routingKey= “warning” message is sent to Exchange, Exchange will only route the message to Queue2. All other messages will be discarded.

  • Routing rules for exchanges like Topic support fuzzy matching of binding keys and routing keys, routing messages to a Queue that meets the criteria. The binding key contains two special characters * and # for fuzzy matching, where * is used to match one word and # is used to match zero or more words with the symbol “. Is the delimiter.

For example, a routingKey= “quick.orange.rabbit” message will be routed to both Q1 and Q2, a routingKey= “lazy.orange.fox” message will be routed to Q1 and Q2. A routingKey= “lazy.brown.fox” message will be routed to Q2, and a routingKey= “lazy.pink.rabbit” message will be routed to Q2 (it will only be delivered to Q2 once, Although this routingKey matches both Q2 bindingkeys); RoutingKey = “quick.brown.fox”, routingKey= “orange”, routingKey= “quick.orange.male. Rabbit” messages will be discarded because they do not match any bindingKey.

  • The Headers Exchange does not rely on the binding key and routing key matching rules to route messages. Instead, it matches the Headers attribute in the content of the sent message.

RPC

MQ itself is based on asynchronous message processing, and in the previous example all the producers (P) sent a message to RabbitMQ will not know whether the consumer (C) processed it successfully or failed (or even if there was a consumer to process the message). However, in the actual application scenario, we probably need some synchronization processing, which needs to wait for the server to complete my message processing before proceeding to the next step. This is equivalent to Remote Procedure Calls (RPC). RPC is also supported in RabbitMQ.

RabbitMQ implements RPC by:

  1. When a client sends a request (message), set two values to replyTo (a Queue name, Used to tell the server to send the notification message to this Queue when the processing is complete) and correlationId (the id of the request. The server needs to return this attribute after the processing is completed. The client will know which request is successfully executed or fails according to this ID)
  2. The server receives the message and processes it
  3. After the replyTo Queue is processed by the server, a reply message is generated and sent to the replyTo Queue with the correlationId attribute. The client has subscribed to the replyTo Queue before receiving the reply message from the server. Analyze which request is executed according to the correlationId attribute, and process subsequent services based on the execution result

Three, application scenarios

  1. System decoupling (publish/subscribe mode)

    A system needs to send data to B/C/D three systems, through the interface call send;

    If system E also needs data and System C doesn’t, then the people in System A go crazy and the business needs change too quickly

If the message system is used, system A only needs to send data to MQ, and the system that needs data can subscribe to topic in MQ

2. Asynchronous messages

After receiving the request, system A needs the local write library, and three more system write libraries need to be BCD. The local write library takes 20ms for system A, 300ms for system B, 450ms for C, and 200ms for D. The final delay is 20+300+450+200=970ms, and the user needs about 1s to receive the final reply. It is too slow (the request time of Internet enterprise users should be less than 200ms).

After using the message system, A only needs to throw the message into the CORRESPONDING MQ queue of the BCD system. After completing the local write library, A can return the message to the user, which takes 20ms. Plus the time of SENDING the message to MQ is 5ms, the user can receive the request reply in 25ms.

3. Flow peak cutting

From 0 o ‘clock to 12 o ‘clock every day, system A is calm and has concurrent requests of 50/s. However, between 12 o ‘clock and 13 o ‘clock at noon, the concurrency increases rapidly and reaches 5K +/s. However, the system is based on MySql, and A large number of requests flood into MySql.

Generally, MySql has A maximum of 2k/s requests, so MQ is needed as A relay station. System A pulls the requests from MQ to 2k/s and stores them in MySql. MySql can resist this pressure, but as A result, the messages in MQ will be more and more pressure. However, this backlog is OK, after the peak, A system continues to process messages at the speed of 2k/s, MQ after the peak will not have too many messages continue to pile in, A system to the maximum processing capacity to process the data in MQ, soon all can be resolved, this is through MQ to peak processing

Fourth, performance comparison

Simple comparison

applications

  • RabbitMQ is a message middleware based on the AMQP protocol implemented in Erlang. It originated in financial systems and is used to store and forward messages in distributed systems. RabbitMQ has become more and more popular today due to its outstanding performance in reliability, usability, extensibility and functionality.
  • Kafka was originally developed by LinkedIn in Scala as a distributed, multi-partitioned, multi-replica zooKeeper coordinated distributed messaging system, which has been donated to the Apache Foundation. It is a high throughput distributed publish and subscribe messaging system widely used for its horizontal scalability and high throughput. More and more open source distributed processing systems such as Cloudera, ApacheStorm, Spark and Flink support Kafka integration.

Architectural model aspect

  • The RabbitMQ followCloser agreementA RabbitMQ broker consists of Exchange, Binding, and queue, where Exchange and Binding form the routing keys. The Producer communicates with the server by connecting the Channel and the server, and the Consumer receives messages from the Queue for consumption. (For a long connection, messages from the Queue are pushed to the Consumer, and the Consumer loop reads data from the input stream.) RabbitMQ is broker centric; There is a mechanism for confirming messages.
  • Kafka follows the general MQ structure, with producer, broker, and consumer as the center. The consumer saves the consumption information of the message on the client. The consumer pulls data in batches from the broker according to the consumption point. No message acknowledgement mechanism exists.

throughput

  • RabbitMQ is not as good as Kafka in terms of throughput. They start from a different point of view. RabbitMQ supports reliable delivery of messages, transactions, not batch operations. Based on storage reliability requirements, storage can be memory or hard disk.
  • Kafka has high throughput, internal use of message batch processing, zero copy (zero-copy) mechanism, data storage and acquisition is the local disk sequential batch operation, with O(1) complexity, message processing efficiency is very high.

A detailed comparison

RabbitMQ and Kafka middleware can be compared for reliability and throughput

reliability

There are three types of message loss:

  • Producer lost data
  • Mq is lost by itself
  • I lost it when I made a consumer purchase

rabbitmq

  • Lost my producer

    • Scenario: Producer sends data to RabbitMQ, but the data fails to be sent due to network problems.

    • Processing: You can use rabbitMQ’s transaction function. When producer sends data, start a transaction (channel.txselect), and if RabbitMQ does not receive the transaction, roll back (channel.txrollback) and try again. If received, commit (channel.txcommit). However, with the addition of transactions, throughput decreases and performance is affected. In confirm mode, each time the producer sends a message, a unique ID is assigned. If writing to MQ succeeds, MQ sends an ACK back to the producer. If MQ fails to process the message, the producer writes back to nACK, indicating that the write failed and the producer can retry.

      The difference between confirm mode and transaction mode is that confirm mode is asynchronous, while transactions are synchronous.

  • Mq is lost by itself

    • Scenario: This is very unlikely, except when MQ is not persisted, it dies on its own, potentially resulting in a small amount of data loss.

    • Processing: The persistence mechanism can be combined with the confirm mechanism. Mq writes an ACK back to the producer after the persistence succeeds. In this way, if the persistence fails, the producer cannot receive an ACK and can send it again.

  • I lost it when I made a consumer purchase

    • Scenario: The application gets the data, doesn’t process it, and the process dies, but MQ thinks you consumed it.

    • Processing: MQ provides an ACK mechanism that requires automatic ACK to be turned off. After the program has finished processing, an API is called to write back the ACK so that the message will not be consumed until the program has finished processing.

kafka

  • Lost my producer

    • Scenario: If ack is not all, data may be lost
    • Solution: Set ACK =all. After the leader receives the message and all followers synchronize the message, the leader considers that the message is successfully sent
  • Mq is lost by itself

    • Scenario: When a Broker in Kafka hangs and the leader of a Pratition is reelected, other followers are still unsynchronized, but the leader hangs, causing a small amount of data loss

    • Solution: Topic set replication.factor, which must be greater than 1 and require at least two copies of the partition

      The min.insync.replicas value on the Kafka server must be greater than 1 to ensure that at least one follower is in contact with the leader and that data is synchronized when the leader dies

      The producer sets the acks = all

      Producer sets retries=MAX and retries indefinitely if a write fails

  • I lost it when I made a consumer purchase

    • Scenario: The application gets the data, does not process it, and the process dies, but the consumer automatically submits the offset. Mq thinks you’ve consumed it.
    • Processing: Similar to RabbitMQ, you can turn off automatic offset submission and manually commit the offset after the program has finished processing

throughput

Server environment 1C+2G, single-node Kafka + single-node RabbitMQ

Kafka comes with a tool called perf-test
  1. Producer pressure test

    Kafka message write test

    MQ message number Number of messages written per second Record size (in bytes)
    10W Article 2000. 1000
    100W Article 5000. 2000
    1000W Article 100000. 100
    1000W Article 1000000. 100

    Write test information

    Pressure test number The test command
    10W ./kafka-producer-perf-test.sh–topictest_perf–num-records100000–record-size1000–throughput2000–producer-propsbootstr Ap. The servers = 192.168.244.130:9092
    100W ./kafka-producer-perf-test.sh–topictest_perf–num-records1000000–record-size2000–throughput5000–producer-propsbootst Rap. The servers = 192.168.244.130:9092
    1000 w (– throughput100000) ./kafka-producer-perf-test.sh–topictest_perf–num-records10000000–record-size100–throughput100000–producer-propsboot Strap. The servers = 192.168.244.130:9092
    1000 w (– throughput1000000) ./kafka-producer-perf-test.sh–topictest_perf–num-records10000000–record-size100–throughput1000000–producer-propsboo Tstrap. The servers = 192.168.244.130:9092

10W (number of writes per second –throughput2000), actual number of written messages 1991records/ SEC, 95% message delay 11ms

100W (number of writes per second –throughput5000), actual number of written messages 4995records/ SEC, 95% message delay 4ms

1000W (number of writes per second –throughput1000000), 221057records/ SEC of actual written message tree, 95% message delay 9455ms

1000W (number of writes per second –throughput1000000), actual written message tree 238027records/ SEC, 95% message delay 1862ms

  1. Consumer stress test

    Parameter description: –bootstrap-server Specifies the kafka address –topic specifies the topic name –fetch-size specifies the data size of each fetch –messages Specifies the total number of messages consumed

    10W Total consumption, execute the following command: . / kafka - consumer - perf - test. Sh - the bootstrap - server192.168.244.130:9092 - topictest_perf - fetch - size1000 - messages100000 - th Reads1 Output:  start.time,end.time,data.consumed.in.MB,MB.sec,data.consumed.in.nMsg,nMsg.sec,rebalance.time.ms,fetch.time.ms,fetch.MB. sec,fetch.nMsg.sec The 2021-06-22 23:29:24:1 374202-06-2223: therefore: 022,95.6879, 36.1359, 27.7946, 100168378 817183,52.2599 1, 54706.7176 The 2021-06-22 23:30:41:1 019202-06-2223: man: 394,95.6879, 40.2896, 76.0000, 100168421, 810156 5,61.1424, 64005.1118 100 w, total spending Execute command: . / kafka - consumer - perf - test. Sh - the bootstrap - server192.168.244.130:9092 - topictest_perf - fetch - size1048576 - messages1000000 Threads1:  start.time,end.time,data.consumed.in.MB,MB.sec,data.consumed.in.nMsg,nMsg.sec,rebalance.time.ms,fetch.time.ms,fetch.MB. sec,fetch.nMsg.sec The 2021-06-22 23:31:09:1 318202-06-2223: says: 626181, 2.8414, 64.0399, 41.6349, 1000451353, 1340269, 68,67.2219, 37097.7084 The 2021-06-22 23:32:56:1 384202-06-2223: a stammering: 076181, 2.8414, 79.8890, 88.2690, 1000451440, 1054216, 38,83.7805, 46235.8351 If the total consumption is 1000W, run the following command: . / kafka - consumer - perf - test. Sh - the bootstrap - server192.168.244.130:9092 - topictest_perf - fetch - size1048576 - messages1000000 0--threads1  start.time,end.time,data.consumed.in.MB,MB.sec,data.consumed.in.nMsg,nMsg.sec,rebalance.time.ms,fetch.time.ms,fetch.MB. sec,fetch.nMsg.sec The 2021-06-22 23:34:22:1 717202-06-2223:35:02:586228, 8.8184, 57.4085, 15.0016, 1400000351, 1038388, 31,58.9431, 36053.6685 The 2021-06-22 23:35:32:1 720202-06-2223: he: 933228, 8.8184, 59.8963, 36.7467, 1400000366, 646375, 67,60.9263, 37266.7501Copy the code

    Conclusion:

    Total consumption messages (unit: W) Total consumption data (unit :M) Consumption data per second (M) Messages consumed per second
    10W 95.6879 36.1359 100168
    100W 1812.8414 79.8890 1000451
    1000W 2288.8184 59.8963 1400000
The rabbitMQ tool comes with it, perf-test
One producer, one consumer, minus x1, Producers rate (- rate2000). / runjavacom. The rabbitmq. Perf. PerfTest - x1 - y1 -u "perf_test1" - a - id "hello1" - rate200095% delay about 1 msCopy the code

One producer, one consumer, minus x1, Producers rate (- rate5000). / runjavacom. The rabbitmq. Perf. PerfTest - x1 - y1 -u "perf_test1" - a - id "hello1" - most rate500095% delay at about 1.6 ms, Occasionally, there are large fluctuations of 7ms, 285ms and so onCopy the code

1 producers (x1), 1 (y1). Consumers/runjavacom rabbitmq. Perf. PerfTest - x1 - y1 -u "perf_test1" - a - id "hello1" without limiting speed, large time delay as shown in the figure, The maximum message release can reach about 29000MSG /sCopy the code

Use JavaDemo for testing

The kafka code is as follows

Introducing Maven dependencies

<dependency>	
    <groupId>org.apache.kafka</groupId>	
    <artifactId>kafka-clients</artifactId>	
<version>server</version></dependency>
Copy the code

Producer code

package com.epoint.demo.kafka;

import com.epoint.demo.Constants;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * Description:
 *
 * @author james
 * @date 2021/6/21 13:54
 */
public class Producer
{
    private final KafkaProducer<String, String> producer;

    public Producer(a) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.244.130:9092");
        (1) acks=0: * Indicates that producer does not need to wait for any broker to acknowledge receipt of the message before sending the next message. Highest performance, but most likely to lose messages. (2) acks=1: * At least wait until the leader has successfully written data to the local log, but do not wait until all followers have successfully written data. You can move on to the next message. In this case, * if the followers do not successfully back up the data and the leader hangs, the message will be lost. (3) acks=-1 or all: * this means that the leader needs to wait for all backups (the number of min.insync.replicas configured) to be successfully written to the log. * This policy guarantees that data will not be lost as long as only one backup survives. This is the strongest data guarantee. This configuration is typically used only for financial grade or money dealing scenarios. * /
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        The default retry interval is 100ms. Retry ensures the reliability of sending messages, but may cause repeated sending of messages, such as network jitter. Therefore, the receiver must perform idempotency processing for receiving messages
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        // Retry interval Settings
        properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
        If this buffer is set, messages will be sent to the local buffer first, which improves message sending performance. The default value is 33554432, that is, 32MB
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // The kafka thread takes data from the buffer and sends it to the broker in batches.
        // Set the size of batch messages to be sent. The default value is 16384 (16KB)
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // The default value is 0, which means the message must be sent immediately, but this affects performance
        When the message is sent, the batch will be sent to the local batch. If the batch reaches 16KB within 100 milliseconds, the message will be sent along with the batch
        // If the batch is not full within 100 milliseconds, the message must also be sent, and the delay for sending the message cannot be too long
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        // Serialize the sent key from a string to a byte array
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        Serialize the send message value from a string to a byte array
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        producer = new KafkaProducer<>(properties);
    }

    public void producer(a) {
        long start = System.currentTimeMillis();

        for (int i = 0; i < Constants.COUNT_300000; i++) {
            String data = Constants.MESSAGE_1 + i;
            producer.send(new ProducerRecord<>(Constants.QUEUE, data),
                    (metadata, exception) -> System.out.println("Sent successfully, MSG :" + metadata.offset()));

        }

        producer.close();
        long end = System.currentTimeMillis();
        System.out.println((end - start) + "ms");
    }

    public static void main(String[] args) {
        newProducer().producer(); }}Copy the code

Consumer code

package com.epoint.demo.kafka;

import com.epoint.demo.Constants;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * Description:
 *
 * @author james
 * @date2021/6/21 shine forth * /
public class Consumer
{
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.244.130:9092");
        // Consumption group name
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
        // Whether to automatically submit offset
        // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // The interval between automatically submitting offset
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        /* * The server broker uses the heartbeat to determine if a consumer is faulty. If it is, it will rebalance all other consumers using the rebalance. This can be shorter */
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
        // How long does the server broker perceive a consumer heartbeat as faulty? Default is 10 seconds
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
        /* * If the poll interval is longer than this, the broker will consider the consumer to be too weak, kick it out of the consumer group, and assign the partition to another consumer */
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // Consumption theme
        consumer.subscribe(Collections.singletonList(Constants.QUEUE));
        // Consume the specified partition
        // consumer.assign(Arrays.asList(new TopicPartition(Constants.QUEUE, 0)));

        // Message backtracking consumption
        // consumer.assign(Arrays.asList(new TopicPartition(Constants.QUEUE, 0)));
        // consumer.seekToBeginning(Arrays.asList(new TopicPartition(Constants.QUEUE,
        / / 0)));
        // Specify offset consumption
        // consumer.seek(new TopicPartition(topicName, 0), 10);

        while (true) {
            // Start pulling data from the server
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
                System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); }); }}}Copy the code

The rabbitMQ code is as follows

Introducing Maven dependencies

<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.12.0</version>
</dependency>
Copy the code

Producer code

package com.epoint.demo.rabbitmq;

import com.epoint.demo.Constants;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * Description:
 *
 * @author james
 * @date 2021/6/22 17:13
 */
public class Producer
{

    public static void main(String[] args) throws Exception {
        long start = 0L;
        long end = 0L;
        try {
            /** * create a connection to MabbitMQ */
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(Constants.RABBITMQ_SERVER);
            factory.setUsername(Constants.RABBITMQ_USERNAME);
            factory.setPassword(Constants.RABBITMQ_PASSWORD);
            factory.setPort(AMQP.PROTOCOL.PORT);
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // Specify a queue
            channel.queueDeclare(Constants.QUEUE, false.false.false.null);

            start = System.currentTimeMillis();
            for (int i = 0; i < Constants.COUNT_3000000; i++) {
                String msg = Constants.MESSAGE_1 + i;
                channel.basicPublish("", Constants.QUEUE, null, msg.getBytes());
                System.out.println("publish " + msg);
            }
            // Close channels and connections
            end = System.currentTimeMillis();
            channel.close();
            connection.close();
        }
        finally {
            System.out.println((end - start) + "ms"); }}}Copy the code

Consumer code

package com.epoint.demo.rabbitmq;

import com.epoint.demo.Constants;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Description:
 *
 * @author james
 * @date2021/6/22 and * /
public class Consumer
{

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.RABBITMQ_SERVER);
        factory.setUsername(Constants.RABBITMQ_USERNAME);
        factory.setPassword(Constants.RABBITMQ_PASSWORD);
        factory.setPort(AMQP.PROTOCOL.PORT);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // Declare the queue to be concerned about
        channel.queueDeclare(Constants.QUEUE, false.false.false.null);
        System.out.println("Customer Waiting Received messages");

        // Tell the server that we need a message for that channel. If there is a message in the channel, the callback handleDelivery will be executed
        DefaultConsumer consumer = new DefaultConsumer(channel)
        {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Customer Received '" + message + "'"); }};// Automatic reply queue reply - message acknowledgement mechanism in RabbitMQ
        channel.basicConsume(Constants.QUEUE, true, consumer); }}Copy the code

30W messages, each of which is “HelloWorld! —–i”

Kafka producers (5 successive test) takes 1791 ms / 1631/1664 1575 ms/ms/ms around 1672 ms average 16 s

Kafka producer (3 consecutive tests) time (simultaneous consumption) 1853ms/1801ms/1892ms Average time is about 18s

The rabbitmq producers (5 successive test) takes 25407 ms / 25776/23758 27000 ms/ms/ms 25834 ms average spent about 25 s

Rabbitmq producer (three consecutive tests) time (simultaneous consumption) 16930ms, 16652ms, or 16252ms the average time is about 16 seconds

Kafka can publish messages significantly faster than RabbitMQ when consumption piles up

Kafka and RabbitMQ publish messages at about the same speed without a consumer backlog

300W messages, each of which is “helloworld! —–i”

Kafka takes 125,37ms for producers and 46s for consumers

Kafka producer takes (simultaneous consumption) 17205ms

The rabbitMQ producer takes 35,775ms and the consumer takes about 203s

The RabbitMQ producer consumes 233558ms

Kafka sends messages much faster than RabbitMQ, regardless of whether consumption piles up

According to the above tests can be concluded

  • Consumption pile-up: Kafka releases messages much faster than RabbitMQ, regardless of the volume of messages

  • Non-consumption heap: The larger the number of messages, rabbitMQ will post messages much slower and Kafka will be proportionally slower

    The message volume increases 10 times, the time for RabbitMQ publishing increases about 15 times, and the time for Kafka publishing increases about 8 times

conclusion

Because the test environments of JavaDemo and the pressure test tool are different, JavaDemo depends on the client tool code and the host and VM network overhead. Therefore, the two test methods cannot be directly compared

Javademo tests show:

Kafka messages can be published much faster than RabbitMQ, especially in the case of heavy consumption

Pressure tool tests showed that:

The maximum number of kafka message writes can reach 238027 records /s, and 95% message latency is 2440ms

The maximum number of messages written to rabbitmq is 29000 records /s, and the 95% message delay is 5970ms

To sum up, kafka has more writes per second than RabbitMQ and higher message throughput than RabbitMQ in the same environment. These tests were performed on a single server, so the test results on a single server are valuable for evaluating whether cluster services can meet the requirements of actual applications.

Five, integration,

Cluster deployment

The preceding tests are performed in a single-node environment. The actual production environment must be in cluster mode. Therefore, this section uses a cluster as an example

Three local Servers (1C+2G)

Kafka

Jdk1.8 based environment, kafka package download address www.apache.org/dyn/closer….

I installed a VM on the local PC and cloned three servers whose IP addresses are 192.168.244.130, 192.168.244.131, and 192.168.244.132

  1. Zookeeper cluster

    Example Change machine 192.168.244.130

    1. Zookeeper is integrated with the Kafka installation package. You only need to upload the Kafka installation package to the /opt directory

    2. Decompress tar -zxvf kafka_2.12-2.8.0.tgz

    3. Go to the kafka config directory CD /opt/kafka_2.12-2.8.0/config

    4. Modify zookeeper.peopertiesvi zookeeper.properties based on the following figure

      • Data. Dir indicates the data directory of ZooKeeper
      • Port 2888 is used by ZooKeeper services to communicate with each other. Port 3888 is used by ZooKeeper services to communicate with other applications.

    1. Create myID file, go to /data/ Zookeeper (dataDir directory),echo 0 > myid, write “0” (This 0 corresponds to the server.0 suffix in zookeeper.properties in the previous step, ensuring that the three machines are different)

    Zookeeper configuration is complete for 192.168.244.130. In the same way, configure the other two servers

    Start the ZooKeeper cluster, run the following command on the three hosts: CD /opt/kafka_2.12-2.8.0/bin

    nohup ./zookeeper-server-start.sh .. /config/zookeeper.properties &>> /data/zookeeper/zookeeper.log &

    If the three machines are not all started, the log will show an error that the other nodes cannot be found

    After all three machines are started, if no error is reported in logs, the cluster is started successfully

  2. Kafka cluster

    1. Go to the kafka config directory CD /opt/kafka_2.12-2.8.0/config

    2. Modify server. Propertiesvi server. The properties

      • Broker.id: indicates the id of a node
      • Advertised. Listeners: Note the local address
      • Log. dirs: log directory
      • Num. Partitions: Number of partitions (optional)
      • Zookeeper. connect: indicates the zooKeeper address

The 192.168.244.130 machine Kafka configuration is complete. Similarly, configure the other two machines

Start the Kafka cluster, run the following commands on the three machines: CD /opt/kafka_2.12-2.8.0/bin: noup./kafka-server-start.sh.. / config/server. The properties & > > / data/kafka - logs/kafka. Log & ` * * all three machines start to finish, such as no error in log shows the cluster start successful * *Copy the code
  1. Test cluster

    The following operations can be done on any node

    1. Create a topic (three partitions, two copies)

      . / kafka - switchable viewer. Sh - create -- zookeeper 192.168.244.130:2181192168 244.131:2181192168 244.132:2181 - replication - factor 2 --partitions 3 --topic cluster-test3

    2. Lists the created topics

      . / kafka - switchable viewer. Sh - the list - the zookeeper 192.168.244.130:2181

    3. Emulated the client to send messages

      Sh --broker-list 192.168.244.130:9092 --topic cluster-test3./kafka-console-producer.sh --broker-list 192.168.244.130:9092 --topic cluster-test3

    4. Emulated the client to receive the message

      ./kafka-console-consumer.sh --bootstrap-server 192.168.244.130:9092 --topic cluster-test3 --from-beginning

RabbitMQ

For details, see juejin.cn/post/693087…

Install RabbitMQ on all three machines using rabbitmqctl

Example Change machine 192.168.244.130

  1. Modify hostname, hostnamectl set-hostname node1

  2. Modify hosts, vi /etc/hosts, and add the following three lines of code (the modification will take effect only after you restart the terminal)

3. Repeat the preceding steps to modify the machine192.168.244.131 and 192.168.244.132

  1. Run service rabbitmq-server start on each machine

  2. Go to node2 (192.168.244.131) and follow the instructions below

    rabbitmqctl stop_apprabbitmqctl resetrabbitmqctl join_cluster rabbit@node1rabbitmqctl start_app
    Copy the code

    If all goes well, something like the following appears

6. Switch to node3 (192.168.244.132) and repeat Step 5

  1. At this point, the cluster set up complete, can go to http://192.168.244.130:15672/ to check the interface, in order to appear the interface for the following said cluster set up successfully

The current cluster is the default normal cluster, in which nodes can share exchanges, RoutingKeys, and queues in the cluster, but messages in a Queue are stored only on the node where the queue was first declared. A consumer of any node can consume messages from other nodes. For example, a consumer connected to Node1 (the IP of node1 used when establishing a Connection in the code) can consume messages from queue queue2 of node2. The message transfer process is as follows: Node2 transmits messages from Queue2 to Node1, which then sends messages to the consumer. Because messages in Queue1 are stored only on the node where Queue1 was first declared, there is a problem: if a node fails, you have to wait for the node to reconnect before you can continue processing messages in that node. (If persistence is not set, messages are lost when the node fails.) In the following figure, when node3 fails, queue3 is down and cannot be accessed.

In response to the above questions, we might think: If nodes in RabbitMQ can be made to behave like nodes in a Redis cluster, with each node holding all messages, such that node1 holds not only messages from queue1, but also messages from queue2 and Queue3 of other nodes, as well as from Node2 and Node3, So you don’t have to worry about downtime. Rabbitmq also provides this functionality: mirrored queues. A mirror queue consists of a master and slaves, and messages are automatically synchronized between mirror nodes instead of being pulled temporarily when consumers fetch data.

Rabbitmqctl set_policy ha-all "^queue" '{"ha-mode":"all","ha-sync-mode":"automatic"}' rabbitmqctl set_policy ha-all "^queue" '{"ha-mode":"all","ha-sync-mode":"automatic"}' # ^queue: matches a queue or exchange whose name starts with ABC. ^ ABC: matches a queue or exchange whose name starts with ABC. # ha-mode: synchronous mode, total 3 modes: #①all- all (all nodes synchronize messages), #②exctly- Specifies the number of nodes (ha-params is an int, for example, 2). #③ Nodes - Specifies the specific nodes (ha-params is configured. This parameter is an array type such as ["rabbit@node1","rabbit@node2"] and specifies that the messages are synchronized on these two nodes).Copy the code

When node3 hangs, queue3’s node becomes Node1, as shown in the figure below

When node3 hangs, node1 automatically becomes queue3. Queue3 does not go down and can add/delete/fetch messages normally, which eliminates the normal cluster outage problem. Mirroring queues are used because each node needs to synchronize messages, which consumes resources. Therefore, mirroring queues are used in scenarios with high reliability.

API Integration (SpringBoot)

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
Copy the code

kafka

Yml configuration

Bootstrap-servers:192.168244.130.:9092.192.168244.131.:9092.192.168244.132.:9092# Number of resending messages after an error occurs. retries:0When multiple messages need to be sent to the same partition, the producer will put them in the same batch. This parameter specifies the amount of memory a batch can use, in bytes. batch-size:16384Set the size of the producer memory buffer. buffer-memory:33554432# key way of serialization key - serializer: org.apache.kafka.com mon. Serialization. StringSerializer # value way of serializing the value - serializer: org.apache.kafka.common.serialization.StringSerializer # acks=0: The producer does not wait for any response from the server before successfully writing the message. # acks=1As soon as the cluster leader receives the message, the producer receives a success response from the server. # acks=all: The producer will not receive a successful response from the server until all participating nodes have received the message. acks:1Consumer: # Auto-commit interval in Spring Boot2.In the X version, the value type is Duration, which needs to conform to specific formats, such as 1S,1M,2H,5Dauto-commit-interval: This property specifies what the consumer should do if it reads a partition without an offset or if the offset is invalid: # latest (default) In the case of an invalid offset, the consumer will start reading from the latest record (the record generated after the consumer started) # earliest: In case the offset is invalid, the consumer will read the partition's record from the starting position auto-offset-reset: Earliest # Whether the offset is automatically submitted. The default istrueTo avoid duplicate data and data loss, set it tofalseThen manually commit the offset enable-auto-commit:false# key way of deserialization key - deserializer: org.apache.kafka.com mon. Serialization. StringDeserializer # values of value - deserializer deserialization way: Org.apache.kafka.com mon. Serialization. StringDeserializer listener: # in a listener container running threads. concurrency:5Commit ack-mode: manual_immediate missing-topics-fatal:false
Copy the code

producers

package com.epoint.test.kafka;

import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * Description:
 *
 * @author james
 * @date2021/7/13 11 * /
@Slf4j
@Component
public class KafkaProducer
{
    public static final String TOPIC_TEST1 = "topic_test1";
    public static final String GROUP_TEST1 = "group_test1";

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public String sendMsg1(String message) {
        // Send a message
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST1, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>()
        {
            @Override
            public void onFailure(@NotNull Throwable throwable) {
                // Send failed processing
                log.info(TOPIC_TEST1 + "- Producer failed to send message:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                // Successful processing
                log.info(TOPIC_TEST1 + "- Producer succeeded in sending message:"+ stringObjectSendResult.toString()); }});return "success"; }}Copy the code

Consumers (yML is configured to require manual acknowledgement, so you need ack.acknowledge() manual acknowledgement, or you can configure automatic acknowledgement, but you don’t need ack.acknowledge())

package com.epoint.test.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * Description:
 *
 * @author james
 * @date2021/7/13 11:02 * /
@Slf4j
@Component
public class KafkaConsumer
{
    @KafkaListener(topics = KafkaProducer.TOPIC_TEST1, groupId = KafkaProducer.GROUP_TEST1)
    public void listener1(ConsumerRecord<? ,? > record, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info(Topic:{}, Message:{}, offset:{}, partition:{}", topic, msg, record.offset(), record.partition());
            // Manually commitack.acknowledge(); }}}Copy the code

The test class

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/kafka/{message}")
    public String kafka(@PathVariable("message") String message) {
        log.info("kafka ===> message:{}", message);
        return kafkaProducer.sendMsg1(message);
    }
Copy the code

Start springboot, the default port 8080, by visiting http://localhost:8080/kafka/message1111111111, you can send messages, consumption log can be viewed through the background details

RabbitMQ

Yml configuration

  rabbitmq:
    addresses: 192.168244.130.:5672.192.168244.131.:5672.192.168244.132.:6572Username: epoint password: epoint # Manual confirmation # Confirmation that messages were sent to switches #publisher-confirm-type: publisher # confirmation that messages were sent to queues #publisher-returns:false
Copy the code
  • Direct Switch Configuration

    package com.epoint.test.rabbitmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /** * Description: The directly connected switch forwards the message to the corresponding queue ** according to the routing key carried by the message@author james
     * @dateHe 2021/7/13 * /
    @Configuration
    public class DirectRabbitMQConfig
    {
        public static final String QUEUE = "directQueue";
        public static final String EXCHANGE = "directExchange";
        public static final String ROUTING = "direct.routing";
    
        /** * switch */
        @Bean
        public DirectExchange myDirectExchange(a) {
            // Parameter meaning:
            / / name: name
            // durable: true
            // autoDelete: automatic deletion
            return new DirectExchange(EXCHANGE, true.false);
        }
    
        /** * queue */
        @Bean
        public Queue myDirectQueue(a) {
            return new Queue(QUEUE, true);
        }
    
        /** * bind */
        @Bean
        public Binding bindingDirect(a) {
            returnBindingBuilder .bind(myDirectQueue()) .to(myDirectExchange()) .with(ROUTING); }}Copy the code
  • Fanout switch configuration

    package com.epoint.test.rabbitmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /** * Description: a fan switch forwards messages to all queues after receiving them, similar to publish/broadcast mode **@author james
     * @date2021/7/13 14:26 * /
    @Configuration
    public class FanoutRabbitMQConfig
    {
        public static final String QUEUE_A = "fanoutQueueA";
        public static final String QUEUE_B = "fanoutQueueB";
        public static final String QUEUE_C = "fanoutQueueC";
        public static final String EXCHANGE = "fanoutExchange";
    
        // ----- Switch -----
        @Bean
        public FanoutExchange fanoutExchange(a) {
            return new FanoutExchange(EXCHANGE, true.false);
        }
    
        // ----- queue -----
        @Bean
        public Queue fanoutQueueA(a) {
            return new Queue(QUEUE_A, true);
        }
    
        @Bean
        public Queue fanoutQueueB(a) {
            return new Queue(QUEUE_B, true);
        }
    
        @Bean
        public Queue fanoutQueueC(a) {
            return new Queue(QUEUE_C, true);
        }
    
        // ----- bind to -----
        @Bean
        public Binding bindingFanoutA(a) {
            return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
        }
    
        @Bean
        public Binding bindingFanoutB(a) {
            return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
        }
    
        @Bean
        public Binding bindingFanoutC(a) {
            returnBindingBuilder.bind(fanoutQueueC()).to(fanoutExchange()); }}Copy the code
  • Topic Switch Configuration

    package com.epoint.test.rabbitmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /** * Description: Supports fuzzy matching of binding keys and routing keys, and will route messages to the Queue that meets the condition **@author james
     * @dateBehold, 2021/7/13 * /
    @Configuration
    public class TopicRabbitMQConfig
    {
        public static final String QUEUE_1 = "topicQueue1";
        public static final String QUEUE_2 = "topicQueue2";
        public static final String EXCHANGE = "topicExchange";
        public static final String ROUTING_01 = "topic.01";
        public static final String ROUTING_ALL = "topic.#";
    
        / / switches
        @Bean
        public TopicExchange myTopicExchange(a) {
            return new TopicExchange(EXCHANGE, true.false);
        }
    
        // ----- queue -----
        @Bean
        public Queue myTopicQueue1(a) {
            return new Queue(QUEUE_1, true);
        }
    
        @Bean
        Queue myTopicQueue2(a) {
            return new Queue(QUEUE_2, true);
        }
    
        /** * bind the routing key to topic.01 */
        @Bean
        public Binding binding1(a) {
            return BindingBuilder.bind(myTopicQueue1()).to(myTopicExchange()).with(ROUTING_01);
        }
    
        /** * bind routing keys to topic.# rule */
        @Bean
        public Binding binding2(a) {
            returnBindingBuilder.bind(myTopicQueue2()).to(myTopicExchange()).with(ROUTING_ALL); }}Copy the code

producers

package com.epoint.test.rabbitmq;

import com.epoint.test.rabbitmq.config.AckRabbitMQConfig;
import com.epoint.test.rabbitmq.config.DirectRabbitMQConfig;
import com.epoint.test.rabbitmq.config.FanoutRabbitMQConfig;
import com.epoint.test.rabbitmq.config.TopicRabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Description:
 *
 * @author james
 * @date2021/7/13 "* /
@Slf4j
@Component
public class RabbitMQProducer
{
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDirect(String message) {
        rabbitTemplate.convertAndSend(DirectRabbitMQConfig.EXCHANGE, DirectRabbitMQConfig.ROUTING, message);
        log.info("SendDirect producer Queue :{} Message sent successfully :{}", DirectRabbitMQConfig.QUEUE, message);
    }

    public void sendFanout(String message) {
        rabbitTemplate.convertAndSend(FanoutRabbitMQConfig.EXCHANGE, null, message);
        log.info(SendFanout producer Queue :{},{},{} Message sent successfully :{}", FanoutRabbitMQConfig.QUEUE_A, FanoutRabbitMQConfig.QUEUE_B, FanoutRabbitMQConfig.QUEUE_C, message);
    }

    public void sendTopic(String message) {
        String message1 = message + " topic.01";
        rabbitTemplate.convertAndSend(TopicRabbitMQConfig.EXCHANGE, TopicRabbitMQConfig.ROUTING_01, message1);
        log.info("SendTopic producer queue:{} Message sent successfully :{}", TopicRabbitMQConfig.QUEUE_1, message1);
        String message2 = message + " topic.xxx";
        rabbitTemplate.convertAndSend(TopicRabbitMQConfig.EXCHANGE, TopicRabbitMQConfig.ROUTING_ALL, message2);
        log.info("SendTopic producer queue:{} Message sent successfully :{}", TopicRabbitMQConfig.QUEUE_2, message2);
    }

    public void sendDirectAck(String message) {
        rabbitTemplate.convertAndSend(AckRabbitMQConfig.EXCHANGE, AckRabbitMQConfig.ROUTING, message);
        log.info("SendDirectAck Producer Queue :{} Message sent successfully :{}", AckRabbitMQConfig.QUEUE, message); }}Copy the code

consumers

package com.epoint.test.rabbitmq;

import com.epoint.test.rabbitmq.config.DirectRabbitMQConfig;
import com.epoint.test.rabbitmq.config.FanoutRabbitMQConfig;
import com.epoint.test.rabbitmq.config.TopicRabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Description:
 *
 * @author james
 * @date2021/7/13 13:58 * /
@Slf4j
@Component
public class RabbitMQConsumer
{

    //Direct
    @RabbitHandler
    @RabbitListener(queues = DirectRabbitMQConfig.QUEUE)
    public void directListener(String message) {
        log.info("directListener:{}", message);
    }

    //Fanout
    @RabbitHandler
    @RabbitListener(queues = FanoutRabbitMQConfig.QUEUE_A)
    public void fanoutListenerA(String message) {
        log.info("fanoutListenerA:{}", message);
    }

    @RabbitHandler
    @RabbitListener(queues = FanoutRabbitMQConfig.QUEUE_B)
    public void fanoutListenerB(String message) {
        log.info("fanoutListenerB:{}", message);
    }

    @RabbitHandler
    @RabbitListener(queues = FanoutRabbitMQConfig.QUEUE_C)
    public void fanoutListenerC(String message) {
        log.info("fanoutListenerC:{}", message);
    }

    //Topic
    @RabbitHandler
    @RabbitListener(queues = TopicRabbitMQConfig.QUEUE_1)
    public void topicListener1(String message) {
        log.info("topicListener1:{}", message);
    }

    @RabbitHandler
    @RabbitListener(queues = TopicRabbitMQConfig.QUEUE_2)
    public void topicListener2(String message) {
        log.info("topicListener2:{}", message); }}Copy the code

Each of the three switches is tested using the following method

 	@Autowired
    private RabbitMQProducer rabbitMQProducer;

    @GetMapping("/rabbit/direct/{message}")
    public String rabbitDirect(@PathVariable("message") String message) {
        log.info("rabbit.direct ===> message:{}", message);
        rabbitMQProducer.sendDirect(message);
        return "success";
    }

    @GetMapping("/rabbit/fanout/{message}")
    public String rabbitFanout(@PathVariable("message") String message) {
        log.info("rabbit.fanout ===> message:{}", message);
        rabbitMQProducer.sendFanout(message);
        return "success";
    }

    @GetMapping("/rabbit/topic/{message}")
    public String rabbitTopic(@PathVariable("message") String message) {
        log.info("rabbit.topic ===> message:{}", message);
        rabbitMQProducer.sendTopic(message);
        return "success";
    }
Copy the code

Start the springboot, the default port 8080, by visiting http://localhost:8080/rabbit/direct/msgDirect, http://localhost:8080/rabbit/fanout/msgFanout, http://localhost:8080/rabbit/topic/msgTopic, you can send messages, log can be viewed through the background details consumption

The above is automatic confirmation. Release the following configuration comments in YML to manually confirm the configuration

The configuration class

package com.epoint.test.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** * Description: Manually confirm the connection with the direct switch **@author james
 * @date2021/7/13 whoever eats * /
@Slf4j
@Configuration
public class AckRabbitMQConfig
{
    public static final String QUEUE = "ackQueue";
    public static final String EXCHANGE = "ackExchange";
    public static final String ROUTING = "ack.routing";

    /** * switch */
    @Bean
    public DirectExchange myDirectAckExchange(a) {
        // Parameter meaning:
        / / name: name
        // durable: true
        // autoDelete: automatic deletion
        return new DirectExchange(EXCHANGE, true.false);
    }

    /** * queue */
    @Bean
    public Queue myDirectAckQueue(a) {
        return new Queue(QUEUE, true);
    }

    /** * bind */
    @Bean
    public Binding bindingDirectAck(a) {
        return BindingBuilder
                .bind(myDirectAckQueue())
                .to(myDirectAckExchange())
                .with(ROUTING);
    }

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);

        // Enable Mandatory to trigger the callback function, which is invoked regardless of the message push result
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("ConfirmCallback: relevant data: {}", correlationData);
            log.info("ConfirmCallback: confirm case: {}", ack);
            log.info("ConfirmCallback: reason: {}", cause);
        });

        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("ReturnCallback: message: {}", message.toString());
            log.info("ReturnCallback: response code: {}", replyCode);
            log.info("ReturnCallback: response message: {}", replyText);
            log.info("ReturnCallback: switch: {}", exchange);
            log.info("ReturnCallback: Routing key: {}", routingKey);
        });

        returnrabbitTemplate; }}Copy the code

Listening to the configuration

package com.epoint.test.rabbitmq.config;

import com.epoint.test.rabbitmq.AckConsumer;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Description:
 *
 * @author james
 * @date 2021/7/13 14:50
 */
@Configuration
public class MessageListenerConfig
{
    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Autowired
    private AckConsumer ackConsumer;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(a) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);

        // Listen to the queue name
        container.setQueueNames(AckRabbitMQConfig.QUEUE);
        // Current number of consumers
        container.setConcurrentConsumers(1);
        // Maximum number of consumers
        container.setMaxConcurrentConsumers(1);
        // Confirm manually
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // Set the listener
        container.setMessageListener(ackConsumer);

        returncontainer; }}Copy the code

consumers

package com.epoint.test.rabbitmq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 * Description:
 *
 * @author james
 * @date 2021/7/13 14:50
 */
@Slf4j
@Component
public class AckConsumer implements ChannelAwareMessageListener
{
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        // The unique ID of the message
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            String msg = message.toString();
            log.info("News:" + msg);
            log.info("The news comes from:" + message.getMessageProperties().getConsumerQueue());

            // Confirm manually
            channel.basicAck(deliveryTag, true);
        }
        catch (Exception e) {
            // Reject the policy
            channel.basicReject(deliveryTag, false); e.printStackTrace(); }}}Copy the code

The test method

	@GetMapping("/rabbit/directack/{message}")
    public String rabbitDirectAck(@PathVariable("message") String message) {
        log.info("rabbit.directack ===> message:{}", message);
        rabbitMQProducer.sendDirectAck(message);
        return "success";
    }
Copy the code

Start springboot, the default port 8080, by visiting http://localhost:8080/rabbit/directack/msgDirectAck, you can send messages, consumption log can be viewed through the background details

FAQ

How to ensure high availability

rabbitmq

Rabbitmq has three modes: single machine, common cluster, and mirrored cluster

  1. Standalone: used for demo test, not used in production

  2. Normal clustering: multiple instances are started on multiple machines, but queues will only exist on one rabbitMQ instance.

    Flow: System A writes data to the queue (queue exists in instance 1), System B needs to consume data from the queue (randomly connected instances),

    However, the queue is in instance 1, and system B is connected to instance 3, so it needs to pull data from the queue in instance 1, which leads to the overhead of pulling data. In fact, this is an ordinary data cluster, which is not distributed

  1. Mirroring cluster: The created queue, whether raw data or messages in the queue, will exist with multiple instances. Each time a message is written, the message will be automatically synchronized to the queues of multiple instances

    Disadvantages: Need to synchronize to all machines, synchronization process too high performance overhead; Poor scalability;

kafka

A natural distributed message queue consisting of multiple brokers, each a node; Create a topic and divide each topic into multiple partitions. Each partition exists on a different broker, and each partition holds a portion of the data.

After kafka0.8, HA mechanism is provided, namely replica mechanism. The data of each partition is synchronized to other machines to form replica copies. Each replica elects a leader. Production and consumption interact with the leader. The leader synchronizes data to the followers when writing data, and reads data to the leader when reading data.

If a broker is hung, all the partitions on that broker have replicas on other machines. If the hung leader happens to be, a new leader will be elected. In this way, high availability is formed.

In terms of data persistence, the producer writes to the leader, who persists the data to the disk. Then other followers actively pullthe leader’s data. Once the follower synchronizes, an ACK is sent to the leader, who writes back to the producer after receiving the ACK.

How to solve repeated consumption

Whether the message queue is reliable or not, idempotent processing should be done within the program

Kafka has a concept of offset. Each time a message is written, there is a unique offset. After the consumer consumes the message, the offset is submitted, indicating that the consumer has already consumed the message.

However, if the offset is not submitted after consumer consumption, the system restarts or kills directly, there will be repeated consumption next time. After repeated consumption, the system should do idemidematism processing, which can be directly through the database primary key, or by writing redis. The specific logic can be designed according to the specific business.

Article Reference:

Juejin. Cn/post / 689892…

Juejin. Cn/post / 685457…