All descriptions of Kafka primitives in this article are based on Kafka version 1.0.0 unless otherwise noted.

The Kafka transaction mechanism is implemented primarily for support

  • Exactly OnceMeaning exactly once
  • Atomicity of operations
  • Recoverability of stateful operations

Kafka supports only the At Least Once and At Most Once semantics in versions prior to 0.11.0.0. Exactly Once semantics are not supported yet.

But in many demanding scenarios, such as using Kafka to process transaction data, Exactly Once semantics are required. We can implement Exactly Once indirectly by making downstream systems idempotent in conjunction with Kafka’s At Least Once semantics. But:

  • This scheme requires downstream systems to support idempotent operations, which limits the applicability of Kafka
  • The barriers to implementation are relatively high and require users to have a good understanding of how Kafka works
  • For Kafka Stream, Kafka itself is its own downstream system, but Kafka did not have idempotent sending capabilities prior to 0.11.0.0

Therefore, Kafka’s own support for the Exactly Once semantics is essential.

Operational atomicity

The atomicity of operations means that multiple operations either all succeed or all fail, and there is no possibility of partial success or partial failure.

The significance of implementing atomic operations is:

  • Operation results are more controllable to improve data consistency
  • Facilitate fault recovery. Because the operation is atomic, recovering from a failure requires either retrying the operation (if the original operation failed) or simply skipping the operation (if the original operation succeeded) without recording the intermediate state, let alone special handling for the intermediate state

Idempotent send

As mentioned above, one way to achieve Exactly Once is to make the downstream system idempotent. In Kafka Stream, Kafka Producer is itself a “downstream” system. That would allow Kafka Stream to support Exactly once semantics to some extent.

To implement the idempotent semantics of Producer, Kafka introduces the Producer ID (PID) and Sequence Number. Each new Producer is assigned a unique PID when initialized, which is completely transparent to the user and not exposed to the user.

For each PID, each

of the data sent by the Producer corresponds to a monotonically increasing Sequence Number starting from 0.

Similarly, the Broker maintains an ordinal number for each and increments the ordinal number each time a message is committed. For each received message, the Broker accepts it if its number is larger than the number the Broker maintained (that is, the number of the message that was last committed), otherwise it is discarded: ,>

  • If the message number is larger than the number maintained by the Broker, data has not been written to the message, which is out of order. The Broker rejects the message and Producer throws itInvalidSequenceNumber
  • If the message number is less than or equal to the number maintained by the Broker, the message is a duplicate message. The Broker directly dismisses the message and Producer throws itDuplicateSequenceNumber

The above design addresses two issues in pre-0.11.0.0:

  • After the Broker saves the message, it breaks down before sending an ACK. The Producer thinks the message failed to be sent and tries again, causing data duplication
  • The previous message fails to be sent, the next message succeeds to be sent, and the previous message succeeds after a retry. As a result, data is out of order

Transactional guarantee

The idempotent design above only guarantees the Exactly Once semantics of a single Producer for the same

.
,>

In addition, it does not guarantee atomicity of writes — that is, multiple writes are either all committed or none.

The atomicity of multiple read and write operations is not guaranteed. For Kafka Stream applications in particular, the typical operation is to consume data from one Topic, write back to another Topic after a series of transitions, ensuring atomicity of reads from the source and writes to the target to help recover from failures.

Transaction assurance enables an application to treat production and consumption data as one atomic unit, with either success or failure, even if the production or consumption spans multiple

.
,>

In addition, stateful applications can be guaranteed to resume processing from the breakpoint after a restart, known as transaction recovery.

To achieve this effect, the application must provide a stable (after reboot) unique ID, also known as the Transaction ID. Transactin ID and PID may correspond one to one. The difference is that the Transaction ID is provided by the user, whereas the PID is an internal implementation transparent to the user.

In addition, to ensure that the old Producer with the same Transaction ID becomes invalid after the new Producer starts, each time the Producer obtains the PID through the Transaction ID, it also obtains a monotonically increasing epoch. Since the epoch of an old Producer is smaller than the epoch of a new Producer, Kafka can easily identify the original Producer as the old Producer and reject its request.

With Transaction ID, Kafka guarantees that:

  • Data is sent idempotent across sessions. When it has the sameTransaction IDWhen a new Producer instance is created and works, the old Producer instance has the same propertiesTransaction IDProducer will no longer work.
  • Transaction recovery across sessions. If an application instance goes down, the new instance can ensure that any unfinished old transactions are either committed or Abort, causing the new instance to start working from a normal state.

It should be noted that the transaction guarantees mentioned above are considered from the perspective of Producer. From the Consumer’s point of view, this guarantee is relatively weak. In particular, there is no guarantee that all messages committed by a transaction will be consumed together, because:

  • For a compressed Topic, some messages of the same transaction may be overwritten by other versions
  • Messages contained in a transaction may be distributed across multiple segments (even within the same Partition), and part of the transaction’s data may be lost when the old Segment is deleted
  • A Consumer may access a message at any Offset within a transaction through the seek method and may lose part of the message
  • The Consumer may not need to consume all partitions within a transaction, so it will never read all the messages that make up the transaction

Transactional messaging

Transactions in this section mainly refer to atomicity, where a Producer sends multiple messages as a single transaction in batches, all of which either succeed or fail.

To achieve this, Kafka 0.11.0.0 introduces a server-side module called Transaction Coordinator that manages the transactionality of messages sent by producers.

The Transaction Coordinator maintains the Transaction Log, which is stored in an internal Topic. Because Topic data is persistent, the state of the transaction is also persistent.

Instead of directly reading or writing a Transaction Log, the Producer communicates with a Transaction Coordinator, who then inserts the Transaction state into the corresponding Transaction Log.

The Transaction Log is designed similar to the Offset Log used to hold Consumer offsets.

Commit Offset in transaction

Many Kafka-based applications, especially Kafka Stream applications, contain both a Consumer, who gets messages from Kafka, and a Producer, who writes the processed data back to other Kafka topics.

To achieve atomicity in this scenario, Kafka needs to ensure that the Commit to the Consumer Offset is included in the same transaction as the Commit to the message sent by Producer. Otherwise, if an exception occurs between the two commits, data loss and data duplication may occur according to the order of the two commits:

  • If Commit Producer sends the data transaction and Commit Consumer Offset, i.eAt Least OnceSemantics, which may cause data duplication.
  • If the Consumer’s Offset is committed and the Producer sends the transaction, i.eAt Most OnceSemantics, which may cause data loss.

Control messages for transactional properties

To distinguish between messages written to partitions that are Commit and Abort, Kafka introduces a special type of Message, the Control Message. The Value of this type of message does not contain any application-related data and is not exposed to the application. It is used only for internal communication between Broker and Client.

For Producer side transactions, Kafka introduces a series of Transaction markers in the form of Control messages. The Consumer can use this flag to determine whether the corresponding message is committed or Abort, and then determine whether the message should be returned to the application based on the isolation level of the Consumer configuration.

Sample transaction processing code

__Wed Nov 22 2017 10:59:10 GMT+0800 (CST)____Wed Nov 22 2017 10:59:10 GMT+0800 (CST)__Producer<String, String> producer = new KafkaProducer<String, String>(props); / / initializes the Transaction, including the end of the Transaction ID corresponding unfinished transactions (if any) / / that a new Transaction in the right conditions to start the producer. InitTransactions (); / / transaction producer. BeginTransaction (); // ConsumerRecords<String, String> records = consumer.poll(100); Send (new ProducerRecord<String, String>("Topic", "Key", "Value")); / / send the consumption data Offset, send the data consumption and data into within the same Transaction producer, sendOffsetsToTransaction (offsets, "group1"); / / data send and Offset are successful cases, commit the transaction producer.com mitTransaction (); } the catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {/ / send or Offset send abnormal data, Terminate the transaction producer. AbortTransaction (); } finally {// Close Producer and Consumer producer.close(); consumer.close(); }__Wed Nov 22 2017 10:59:10 GMT+0800 (CST)____Wed Nov 22 2017 10:59:10 GMT+0800 (CST)__Copy the code

Complete transaction process

findTransaction Coordinator

Since Transaction coordinators are at the heart of allocating Pids and managing transactions, Therefore, the first thing the Producer needs to do is to find the location of the Transaction Coordinator by sending a FindCoordinator request to any Broker.

Note: This step is required only if the application has configured the Transaction ID for Producer. In addition, since transactional nature requires Producer to enable idempotence, turning on transactional transactional features by setting transactional. Id to non-null also requires turning on idempotence by setting enable.idempotence to true.

For the PID

After a Transaction Coordinator is found, the Producer with idempotent properties must initiate an InitPidRequest to obtain the PID.

Note: This action must be performed whenever the idempotent property is enabled, regardless of whether the Producer has transaction properties enabled.

* If the Transaction feature is enabled * InitPidRequest is sent to the Transaction Coordinator. If a Transaction Coordinator receives an InitPidRequest containing the TransactionID for the first time, it will store the
in the Transaction Log. See Step 2.1 in the figure above. This ensures that the correspondence is persisted and will not be lost even if a Transaction Coordinator goes down.
,>

In addition to returning a PID, InitPidRequest performs the following tasks:

  • Add the epoch corresponding to the PID. Newly opened transactions of other producers with the same PID but with an EPOCH smaller than this EPOCH (if any) will be rejected.
  • Restore (Commit or Abort) the previous Producer’s unfinished transaction, if any.

Note: The processing of InitPidRequest is blocked synchronously. Once the call returns correctly, the Producer can start a new transaction.

In addition, if the transaction feature is not enabled, InitPidRequest can be sent to any Broker and will get a new unique PID. The Producer can only use idempotent properties and transactions within a single Session, not transactions across sessions.

Open the transaction

Kafka, starting with version 0.11.0.0, provides the beginTransaction() method to start a transaction. After invoking this method, the Producer locally records that the Transaction has started, but the Transaction Coordinator does not think the Transaction has started until the Producer sends the first message.

Consume-Transform-Produce

This phase covers the data processing of the entire transaction and contains multiple requests.

AddPartitionsToTxnRequest a Producer may bring multiple < Topic, Partition > send data, give a new < Topic, Partition > before sending data, It needs to be the first to the Transaction Coordinator send AddPartitionsToTxnRequest.

The Transaction Coordinator stores the
in the Transaction Log and sets its status to BEGIN, as shown in Step 4.1 in the figure above. Armed with this information, we can set COMMIT or ABORT flags for each Topic, Partition> in the following steps (as shown in Step 5.2 in the figure above).
,>

In addition, if the

is the first

in the Transaction, the Transaction Coordinator will start timing the Transaction (each Transaction has its own timeout).
,>
,>

ProduceRequest Producer sends a series of messages through one or more producerequests. In addition to the application data, the request also contains the PID, EPOCH, and Sequence Number. This process is shown in Step 4.2 in the figure above.

To provide transactiveness, Producer has added a sendOffsetsToTransaction method, which puts the sending and consumption of multiple groups of messages into the same batch of processing.

This method determines whether the method has already been called and passed the same Group ID in the current transaction. If so, skip to the next step. If not, send an AddOffsetsToTxnRequests request to the Transaction Coordinator, A Transaction Coordinator stores all the corresponding

in the Transaction Log and records its status as BEGIN, as shown in Step 4.3 in the figure above. The method blocks until a response is received.

TxnOffsetCommitRequest as part of the sendOffsetsToTransaction method. After processing AddOffsetsToTxnRequest, The Producer also sends a TxnOffsetCommit request to the Consumer Coordinator to persist the transaction’s Offset of each

related to the read operation into its internal __consumer_offsets. See Step 4.4 in the preceding figure.
,>

During this process, the Consumer Coordinator verifies whether the Producer’s request should be allowed through the PID and the corresponding EPOCH.

Note here:

  • write__consumer_offsetsThe Offset information is not visible until the current transaction is committed. That is, before the current transaction is committed, the Offset can be considered to have not been committed, that is, the corresponding message has not been processed.
  • Consumer CoordinatorThe corresponding cache is not updated immediately<Topic, Partition>Offset, because at this point these update operations have not been committed or ABORT.

Commit or Abort transactions

Once the data writing operation is complete, the application must call either KafkaProducer’s commitTransaction method or abortTransaction method to terminate the current transaction.

The EndTxnRequest commitTransaction method makes data written by the Producer visible to downstream consumers. The abortTransaction method marks the data written by Producer as Aborted by Transaction Marker. A downstream Consumer with Isolation. level set to READ_COMMITTED reads an Abort message and discards it without returning it to the client, meaning that the message is not visible to the application.

Either Commit or Abort, producers send EndTxnRequest requests to Transaction coordinators and use flag bits to identify whether the request should be Commit or Abort.

After receiving the request, the Transaction Coordinator performs the following operations

  1. willPREPARE_COMMITorPREPARE_ABORTNews writingTransaction Log, as shown in Step 5.1 in the figure above
  2. throughWriteTxnMarkerRequest toTransaction MarkerThe form ofCOMMITorABORTInformation is written to the user data log as wellOffset Log, as shown in Step 5.2
  3. The final will beCOMPLETE_COMMITorCOMPLETE_ABORTiwTransaction Log, as shown in Step 5.3 in the figure above

Note: For commitTransaction methods, flush is called before sending EndTxnRequest to ensure that all sent data is properly ACK. The abortTransaction method directly discards all of the transactional messages (if any) in the current Buffer before sending EndTxnRequest, but must wait for all messages sent that have not yet received an ACK to complete.

The second step above is the key to implementing a set of read and write operations as one transaction. Since data topics written by Producer and topics recording Comsumer offsets are written to the same Transactin Marker, this set of read and write operations are either all COMMIT or all ABORT.

WriteTxnMarkerRequest The WriteTxnMarkerRequest is sent by the Transaction Coordinator to the Leader of each

involved in the current Transaction. After receiving the request, the Leader writes the COMMIT(PID) or ABORT(PID) control information to the log, as shown in Step 5.2 in the figure above.

This control message indicates to the Broker and Consumer whether the corresponding PID message was committed or Abort.

Note here that if the transaction also involves __consumer_offsets, that is, the transaction has operations that consume data and stores the Offset of that consumption in __consumer_offsets, Transaction coordinators also need to send WriteTxnMarkerRequest to the leaders of each Partition of the internal Topic to write COMMIT(PID) or COMMIT(PID) control information.

After writing the final COMPLETE_COMMIT or COMPLETE_ABORT message to all Transaction markers, A Transaction Coordinator writes the final COMPLETE_COMMIT or COMPLETE_ABORT message to the Transaction Log to mark the end of the Transaction, as shown in Step 5.3 in the figure above.

At this point, all messages about the Transaction in the Transaction Log can be removed. Of course, since Kafka data is append-only and cannot be updated or deleted directly, we simply mark it as NULL so that it is no longer retained in Log Compact.

In addition, writes by COMPLETE_COMMIT or COMPLETE_ABORT do not need to get all RReplicas’ ACK, because if the message is lost, it can be reissued according to the transaction protocol.

In addition, if some

participating in the Transaction is not available before being written to Transaction Marker, it is not visible to READ_COMMITTED consumers. This does not affect COMMIT or ABORT of other available

. After the

is available again, The Transaction Coordinator will re-send the Transaction Marker to the

based on PREPARE_COMMIT or PREPARE_ABORT.
,>
,>
,>
,>

conclusion

  • PIDwithSequence NumberThe idempotence of write operation is realized by the introduction of
  • Idempotent combination of write operationsAt Least OnceSemantics are implemented within a single SessionExactly OnceThe semantic
  • Transaction MarkerwithPIDProvides the ability to identify whether a message should be read, thereby achieving transaction isolation
  • The update to Offset marks whether the message has been read, thus converting transactions on read operations to transactions on write operations
  • The essence of a Kafka transaction is to mark messages corresponding to a set of writes (if any) with updates to Offset corresponding to a set of reads (if any) (i.eTransaction Marker) to make all read and write operations involved in a transaction visible or invisible simultaneously
  • Kafka only provides transactionality to read and write operations on Kafka itself, not to include external systems

Exception handling

InvalidProducerEpoch This is a Fatal Error that indicates that the current Producer is an expired instance. A Producer instance with the same Transaction ID but updated epoch was created and used. The Producer stops and throws an Exception.

InvalidPidMapping Transaction Coordinator No PID corresponds to the Transaction ID. The Producer creates a new PID using an InitPidRequest containing the Transaction ID.

NotCorrdinatorForGTransactionalId the Transaction Coordinator is not responsible for the current Transaction. The Producer uses FindCoordinatorRequest to find the corresponding Transaction Coordinator.

InvalidTxnRequest violates the transaction protocol. A proper Client implementation should not have this Exception. If this exception occurs, users need to check their client implementation for problems.

CoordinatorNotAvailable Transaction Coordinator is still being initialized. The Producer just needs to retry.

DuplicateSequenceNumber sent a message with a lower sequence number than the Broker expected. This exception indicates that the message has been successfully processed, and the Producer can ignore the exception and process the next message

InvalidSequenceNumber This is a Fatal Error that indicates that the sequence number in the sent message is greater than the Broker expected. There are two possibilities

  • Data is out of order. For example, a new message is received during a retry after the previous message failed to be sent. Normally this problem should not occur because when idempotent send is enabled,max.inflight.requests.per.connectionIs forced to be set to 1, andacksIs forcibly set to all. Therefore, the following messages will not be sent during the previous message retry, that is, the sequence will not be out of order. Only when all replicas in the ISR ACK, the Producer considers that the message has been sent. In other words, there is no data loss at the Broker.
  • Truncate logs cause data loss. At this point, the Producer should stop producing and report the Fatal Error to the user.

InvalidTransactionTimeout InitPidRequest calls a Fatal Error. It indicates that the timeout time passed by the Producer is not acceptable and the Producer should be stopped and reported to the user.

To deal withTransaction Coordinatorfailure

writePREPARE_COMMIT/PREPARE_ABORTBefore the failure

The Producer finds a new Transaction Coordinator through FindCoordinatorRequest and initiates a COMMIT or ABORT process through EndTxnRequest. The new Transaction Coordinator continues to process EndTxnRequest requests — write PREPARE_COMMIT or PREPARE_ABORT, write Transaction Marker, Write COMPLETE_COMMIT or COMPLETE_ABORT.

finishPREPARE_COMMIT/PREPARE_ABORTAfter the failure

At this point, the old Transaction Coordinator may have successfully written part of the Transaction Marker. A new Transaction Coordinator may repeat these operations, so some partitions may have repeated COMMIT or ABORT, but as long as the Producer does not initiate a new Transaction during this period, These repeated Transaction markers are not a problem.

finishCOMPLETE_COMMIT/ABORTAfter the failure

An old Transaction Coordinator might have finished writing COMPLETE_COMMIT or COMPLETE_ABORT but failed before returning EndTxnRequest. In this scenario, the new Transaction Coordinator directly returns success to the Producer.

Transaction expiration mechanism

Transaction timeout

transaction.timeout.ms

Terminate expired transactions

When Producer fails, a Transaction Coordinator must be able to actively expire some ongoing Transaction. Otherwise, without the participation of producers, Transaction coordinators cannot determine how these transactions should be processed, which results in:

  • If there are too many of these ongoing transactions, it can causeTransaction CoordinatorA lot of transaction states need to be maintained, which takes up a lot of memory
  • Transaction LogThere will also be a lot of data inside, creating newTransaction CoordinatorSlow start
  • READ_COMMITTEDConsumers need to cache a large number of messages, resulting in unnecessary memory waste or even OOM
  • If multipleTransaction IDDifferent producers interwrite to the same Partition. If the transaction status of one Producer is not updated,READ_COMMITTEDConsumers are blocked to ensure sequential consumption

To avoid the preceding problems, a Transaction Coordinator periodically traverses the Transaction status Map in memory and performs the following operations

  • If the state isBEGINAnd the difference between the last update time and the current time is greater thantransaction.remove.expired.transaction.cleanup.interval.ms(The default value is 1 hour), terminates the original Producer actively: 1) The original Producer did not avoid the conflict between the temporary recovery and the current termination process, added the epoch of the PID corresponding to the Producer, and ensured that the updated information was writtenTransaction Log; 2) Roll back the transaction with the updated epoch, so that all brokers associated with the transaction update their cached EPOCH of the PID and reject the old Producer’s writes
  • If the state isPREPARE_COMMITTo complete the subsequent COMMIT process ———— to each<Topic, Partition>writeTransaction MarkerIn theTransaction LogIn writingCOMPLETE_COMMIT
  • If the state isPREPARE_ABORT, and completes the subsequent ABORT process

Termination ofTransaction ID

The Producer of a Transaction ID may not send data for a long time, and the Coordinator does not need to save the mapping between the Transaction ID and the PID. Otherwise, a large amount of resources may be wasted. Therefore, a mechanism is needed to detect inactive Transaction ids and remove their information.

A Transaction Coordinator periodically iterates through the mapping between Transaction ids and Pids in memory. If a Transaction ID is not corresponding to the ongoing Transaction and it corresponds to the end of the last Transaction time and the current time is greater than the transactional. ID. Expiration. Ms (the default value is 7 days), It is removed from memory and its corresponding Log value is set to NULL in the Transaction Log so that Log Compact can delete its records.

PostgreSQL MVCC

PostgreSQL implements transactions using MVCC. The PostgreSQL implements transactions using MVCC. The PostgreSQL implements transactions using MVCC. Both mark written transactions as Rollback/Abort to filter data when it is read.

Two-phase commit

Kafka’s transaction mechanism looks similar to the two-phase COMMIT mechanism described in Distributed Transactions (1) two-phase COMMIT and JTA. There are two stages: PREPARE and COMMIT, but they are quite different.

  • In Kafka, PREPARE is specified asPREPARE_COMMITorPREPARE_ABORT, and only inTransaction LogNo other component is required. The PREPARE for a two-phase commit needs to be sent to all distributed transaction participants, who need to be as prepared as possible and return as prepared as they arePreparedorNon-PreparedState to the transaction manager.
  • Kafka transaction, one but initiatedPREPARE_COMMITorPREPARE_ABORT, the final result of the transaction should be determined byCOMMITorABORT. In a distributed transaction, all participants return the status after PREPARE, but only all participants return the statusPreparedCOMMIT is performed in the state; otherwise, ROLLBACK is performed
  • In Kafka transactions, when several partitions become unavailable during COMMIT or ABORT, only one Partition is affected. In the two-phase COMMIT, if the only participant who receives the COMMIT command crashes, other participants cannot determine the transaction status and the whole transaction will be blocked
  • The Kafka transaction mechanism introduces a transaction timeout mechanism, which effectively avoids the problem of pending transactions affecting other transactions
  • There are several Kafka transaction mechanismsTransaction CoordinatorInstance, whereas a distributed transaction has only one transaction manager

Zookeeper

The atomic broadcast protocol of Zookeeper is similar to two-phase commit and Kafka transaction mechanism, but has its own characteristics

  • Kafka transactions can be COMMIT or ABORT. The Zookeeper atomic broadcast protocol has only COMMIT and no ABORT. Of course, Zookeeper does not COMMIT a message, which is equivalent to ABORT updating the message.
  • There are several KafkaTransaction CoordinatorExample, good scalability. However, the Zookeeper write operation can only be performed on the Leader node, so the write performance is much lower than the read performance.
  • Whether a Kafka transaction is COMMIT or ABORT depends entirely on Producer, the client. In the Zookeeper atomic broadcast protocol, whether a message is committed depends on whether more than half of the followers ACK the message.

Kafka series articles

  • Kafka design analysis (a) – Kafka background and architecture introduction
  • Kafka High Availability (上)
  • Kafka High Availability (下)
  • Kafka Consumer Design analysis
  • Kafka design analysis (5) – Kafka performance test methods and Benchmark report
  • Kafka Design Analysis (6) – Kafka high performance architecture approach
  • Kafka Stream Kafka Stream