Distributed transaction

What are distributed transactions

Our server from single to have multiple machines distributed system, each system before the need of network communication, the original single relatively reliable method calls and there is no way to use inter-process communication way, at the same time, the network environment is not stable, caused the problem of data synchronization between the multiple machines, This is the classic distributed transaction problem.

In distributed transactions, transaction participants, transaction supporting servers, resource servers, and transaction managers are located on different nodes in different distributed systems. Distributed transactions ensure data consistency between different nodes.

Common distributed transaction solutions

1. 2PC(two-stage submission) scheme – strong consistency

2. 3PC(three-phase submission) scheme

3. TCC (try-confirm-cancel) transaction – Final consistency

4. Saga transactions – Final consistency

5. Local message table – Final consistency

6. MQ transactions – Final consistency

The focus here is on using message queues to achieve distributed consistency. See the reference links at the end of this article for details of the above distributed design solutions

Distributed transactions based on MQ implementation

Local message table – Final consistency

Message producers, in addition to maintaining their own business logic, also need to maintain a message table. This message table records the information that needs to be synchronized to other services. Of course, in this message table, each message has a status value to indicate whether the message has been successfully processed.

The sent business logic and the insertion of data in the message table will be done in a single transaction, avoiding the problem of business process success + transaction message send failure or business process failure + transaction message send success.

Here’s an example:

We assume that we have two services, an order service and a shopping cart service, where the user combines several items in the shopping cart and then asks for information about the items in the cart that have just been ordered.

1. The producer of the message, the order service, completes its logic (to place an order for the item) and sends the message via MQ to the other service that needs data synchronization, the shopping cart service in our case.

2. Other services (shopping cart services) listen on this queue;

1. If the message is received and the data synchronization is successful, which is also a local transaction, the producer of the reply message (the order service) has been processed via MQ, and the producer can mark the end of the transaction. If it is a business error, the producer of the reply message needs to roll back the data.

2. If the message is not received for a long time, this situation will not happen. The sender of the message will have a scheduled task to periodically retry sending the message that has not been processed in the message table.

3. If the producer of the message (order service) receives the message receipt;

1. If the message is successfully modified, the synchronization of the distributed transaction has been completed.

2. If the message results in execution failure and the transaction is rolled back locally, it indicates that the message has been processed.

3, if the message is missing, is the return receipt message not received, this situation is also not too will occur, the sender of the message (orders) will be a regular task, timing retry sending messages have not process the messages in the table, the downstream service needs to be done idempotent, may receive duplicate messages for many times, if a reply message to the production of a receipt information lost, The mq message is then continuously received from the producer, and the producer of the message replies again, always ensuring that the sender receives the receipt successfully, and the producer of the message is idempotent when receiving the receipt message.

There are two important operations:

1. The server needs to process messages idempotent, both the producer and the receiver of the message need to be idempotent;

2. Sending and releasing need to add a timer to traverse and repush unprocessed messages to avoid message loss and transaction execution break.

Advantages and disadvantages of the scheme

Advantages:

1, at the design level to achieve the reliability of message data, do not rely on message middleware, weaken the dependence on MQ features.

2. Simple and easy to implement.

Disadvantages:

It needs to be bound to service data and has high coupling. Using the same database will occupy some resources of the service database.

MQ transactions – Final consistency

Let’s examine transaction support for several message queues

How are transactions handled in RocketMQ

Transactions in RocketMQ, which addresses the problem of ensuring that both local transactions and messages are performed successfully or fail. Furthermore, RocketMQ adds a transaction backcheck mechanism to maximize transaction execution success and data consistency.

There are two main aspects, normal transaction commit and transaction message compensation

Normal transaction commits

1. Send a half message, which differs from a normal message and is not visible to the consumer until the transaction commits.

2. MQ SERVER writes the information and returns the result of the response.

3. Determine whether to execute the local transaction according to the result of the MQ SERVER response. If the local transaction is executed successfully, the LOCAL transaction will not be executed otherwise.

4. Decide whether to Commit or Rollback the transaction based on the status of the local transaction execution. When the MQ SERVER receives a Commit, it posts the message to the downstream subscription service, which synchronizes data. If it is Rollback, the message is lost.

If the MQ SERVER does not receive a Commit or Rollback message, this situation requires a compensation process

The compensation process

If the MQ SERVER does not receive a Commit or Rollback message from the sender, it will issue a query to the sender, our SERVER, to check the status of the current message.

2. The message sender receives the corresponding query request, queries the status of the transaction, and pushes the state back to MQ SERVER for subsequent processes.

Instead of processing distributed transactions from local message tables, MQ transactions place logic that should be processed from local message tables into MQ.

How are transactions handled in Kafka

Transactions in Kafka solve the problem of ensuring that multiple messages sent in a transaction either succeed or fail. That is, to ensure atomicity of writes to multiple partitions.

Kafka’s Exactly Once is realized by combining with Kafka’s idempotent mechanism, which satisfies the read-process-write mode of the application. Of course, transactions in Kafka are primarily designed to handle this pattern.

What is read-process-write mode?

For example, in the case of streaming computing, where Kafka is used as the data source and results are saved to Kafka, data is consumed from a Kafka topic, computed in a compute cluster, and the results are saved to other Kafka topics. Ensure that each message is processed only once in order to ensure the success of the final result. The atomicity of Kafka transactions guarantees that both reads and writes will either succeed together or fail together and roll back.

How is Kafka transaction implemented

It is implemented in much the same way as RocketMQ transactions, based on two-phase commits, which can be more difficult to implement

To address the problem of distributed transactions, Kafka introduces the role of the transaction coordinator, responsible for coordinating the entire transaction on the server side. The coordinator is not an independent process, but part of the Broker process. The coordinator, like the partition, is elected to ensure its own availability.

The Kafka cluster also has a special topic for logging transactions. Multiple coordinators exist at the same time, each responsible for managing and using several partitions in the transaction log. This allows transactions to be executed in parallel, improving performance.

Let’s look at the specific process

  • 1. When the transaction is started, the producer will send a request to the coordinator to start the transaction, and the coordinator will record the transaction ID in the transaction log.
  • 2. The producer then sends the transaction message to the coordinator, but it needs to send a message to tell the coordinator which topic and partition it is on, and then sends the transaction message normally. Unlike RocketMQ, which stores the transaction message in a special queue, Kafka uncommitted transaction messages behave like normal messages. Just rely on the client for filtering when consuming.
  • 3. When the message is sent, the producer submits or rolls back the transaction to the coordinator according to its execution status;

Transaction commit

1. The coordinator sets the state of the transaction to PrepareCommit and writes it to the transaction log;

2. The coordinator writes the end of transaction flag in each partition, and then the client can release the previously filtered uncommitted transaction message to the consumer for consumption;

Rollback of transactions

1. The coordinator sets the state of the transaction to PrepareAbort and writes it to the transaction log.

2. The coordinator writes a transaction rollback flag in each partition, and uncommitted transaction messages are discarded.

Here’s a quote from message queuing Masters

Transactions in RabbitMQ

The problem with RabbitMQ transactions is to ensure that producer messages reach the MQ SERVER, which is a bit different from other MQ transactions and won’t be discussed here.

Message loss prevention

Let’s start by analyzing the stages through which the next message flows through MQ.

Production phase: The producer generates messages and sends them to the Broker side over the network.

Storage phase: The Broker takes the message and needs to unload it, and if it is a clustered VERSION of MQ needs to synchronize the data to other nodes.

Consumption phase: The consumer pulls data from the Broker and transmits it over the network to the consumer.

The production phase prevents message loss

Packet loss and network faults may cause message loss

Anti-loss measures in RabbitMQ

  • 1. For perceived errors, we catch them and repost them;
  • RabbitMQ transactions solve the problem of production message loss.

Before the producer sends the message, it starts a transaction with channel.txSelect, and then sends the message. If the message fails to deliver to the server, the transaction rolls back channel.txrollback, and then resends. If the server receives the message, Commit the transaction channel.txcommit

However, transaction performance is poor, this is a synchronous operation, a message sent will block the sender waiting for a response from the RabbitMQ Server before the next message can be sent, and the throughput and performance of the producer producing the message will be greatly reduced.

  • 3. Use the send confirmation mechanism.

Using the confirmation mechanism, the producer sets the channel to Confirm mode. Once the channel enters Confirm mode, all messages posted on the channel are assigned a unique ID (starting from 1). Once the message has been delivered to all matching queues, RabbitMQ sends an acknowledgement (basic.ack) to the producer (containing the message’s unique deliveryTag and multiple parameters), which lets the producer know that the message has arrived at its destination correctly.

Multiple = true indicates batch message confirmation. When true, it means that all the message ids less than or equal to the returned deliveryTag have been confirmed; false indicates that the message id of the returned deliveryTag has been confirmed.

There are three types of validation mechanisms

1. Confirm synchronization

2, batch confirmation

3. Asynchronous confirmation

Synchronous mode is inefficient because each message has to wait for confirmation before processing the next one;

The batch confirmation mode is more efficient than the synchronous mode, but it has a fatal defect. Once the reply confirmation fails, all the messages in the current batch will be re-sent, resulting in repeated messages.

Asynchronous mode is a good choice. It does not have the blocking problems of synchronous mode, but it is also very efficient. It is a good choice.

Anti-loss measures in Kafka

A broker is introduced in Kafaka. The broker acknowledges messages to producers and consumers. The producer sends a message to the broker and can continue sending if it does not receive an acknowledgement from the broker.

As long as the Producer receives an acknowledgement from the Broker, the message is guaranteed not to be lost during production. Some message queues will automatically retry after a long time without a send confirmation response, and if the retry fails, the user will be notified with a return value or an exception.

By properly handling the Broker’s acknowledgement response, you can avoid message loss.

Anti-loss measures in RocketMQ

  • Send messages using SYNC and wait for the broker to process the result

RocketMQ provides three ways to send messages:

Synchronous sending: Producer sends a message to the broker and blocks the current thread waiting for the broker to respond.

Asynchronous sending: Producer first constructs a task that sends a message to the broker, submits the task to the thread pool, and when the task is completed, calls back user-defined callback functions to execute the processing result.

Oneway sending: The Oneway mode only sends the request without waiting for a response. Producer only sends the request without processing the response result.

  • With transactions, transactions in RocketMQ, the problem is to ensure that both the local transaction and the message will either succeed or fail.

Storage stage

In the storage phase, messages can normally be lost as long as the Broker is running, but messages can be lost if the Broker fails, such as when the process dies or the server goes down.

Anti-loss measures in RabbitMQ

Prevents loss of messages during storage, can persist, prevents exceptions (restart, shutdown, downtime)…

There are three parts to RabbitMQ persistence:

  • Persistence of the exchange

Durable switches are durable when the durable parameter is set to true. If the durable parameter is not set, the switch information will be lost.

  • Queue persistence

The durable parameter of a queue is set to true when it is declared as durable. Persistence of a queue ensures that its metadata will not be lost due to exceptions, but it does not guarantee that messages stored in the queue will not be lost.

  • Persistence of messages

Message persistence, specify delivery_mode=2 (1 is non-persistent), message persistence, need to cooperate with queue persistence, only set message persistence, after restart the queue disappears, then the message will be lost. So it doesn’t make much sense to set message persistence without queue persistence.

For persistence, if all messages are set to persist, the write performance will be affected. Therefore, you can choose to persist messages with high reliability requirements.

Message persistence, however, is not 100% safe from message loss

For example, data breaks down in the process of falling from disk and messages are not synchronized to memory in time, which will also cause data loss. This problem can be solved by introducing mirrored queues.

Mirror queues are used to mirror other Broker nodes in the cluster. If a node in the cluster fails, the queue can be automatically switched to another node in the mirror to ensure service availability. (More details are not discussed here.)

Anti-loss measures in Kafka

The operating system has its own layer of Cache, called the Page Cache, where data streams are written to disk files.

When Kafka receives a message, it stores it in the Page Cache. The operating system then flushs the message based on its own policy or forces it to flush the message using the fsync command. If the system fails, the data in The PageCache is lost. That is, the data in the corresponding Broker is lost.

Processing way of thinking

Control the leader Broker of the campaign partition. If a Broker falls too far behind the original Leader, messages will inevitably be lost once it becomes the new Leader.

2. Control messages can be written to multiple copies before committing to avoid problem 1 above.

Anti-loss measures in RocketMQ

1. Change the brushing mode to synchronous brushing;

2. For brokers with multiple nodes, you need to configure a cluster of brokers to send messages to at least two nodes, and then send acknowledgement responses to clients. In this way, when one Broker goes down, other brokers can replace it without loss of messages.

Consumption stage

In the consumption stage, it is very simple. If the message is lost in the network transmission, it will continue to be pushed to consumers. In the consumption stage, we only need to control the consumption confirmation after the completion of the business logic processing.

Conclusion: For the loss of messages, we can also rely on the idea of local message table, when the message is generated, the message will be dropped, and the message that has not been processed for a long time will be repushed to the queue by timing.

Message resending

The delivery of messages in MQ can be roughly classified as follows:

E.g. : At most once. Messages are delivered at most once during transmission. It’s not secure. You can lose data.

E.g. < 1 > At least once. A message will be delivered at least once during delivery. That is, no lost messages are allowed, but a small number of duplicate messages are allowed.

E.g. < 1 > Exactly once. The message is delivered only once, and cannot be lost or repeated. This is the highest level.

Most message queues are At least once, meaning that repeated messages are allowed.

We consumers need to satisfy idempotent, and there are usually several solutions

1. Use the uniqueness of the database

According to the business situation, select the unique value that can be determined in the business as the unique key of the database, create a new flow table, and then execute the business operation and the insert of the flow table data in the same transaction. If the flow table data already exists, the execution will fail, so as to ensure idempotency. You can also query the data of the flow table first, and then execute the business without data, and insert the data of the flow table. Be aware, however, of database read and write latency.

2. Add preconditions for database updates

3. Attach a unique ID to the message

Add a unique ID to each message, and use the uniqueness of the database to handle the consumption of repeated messages by adding pipelinings in method 1.

reference

Message queue master class time.geekbang.org/column/intr 】… Message queue design essence of tech.meituan.com/2016/07/01/ 】… The RabbitMQ practical guide book.douban.com/subject/275 】… The distributed transaction is the most classic seven solution 】 segmentfault.com/a/119000004… Draveness. Me/Distributed… Understanding distributed transactions juejin.cn/post/684490… 【 Design 】github.com/apache/rock… How does Kafka implement transactions? Zhuanlan.zhihu.com/p/163683403 【 MQ – the RabbitMQ Cluster 】 www.cnblogs.com/Neeo/articl… How to avoid message loss? www.lixueduan.com/post/kafka/… 【 the RabbitMQ, RocketMQ, Kafka transactional, repeat send message loss and processing strategy 】 boilingfrog. Making. IO / 2021/12/30 /…