Transaction producer

We learned about idempotent producers. We learned about the two limitations of idempotent producers: they do not support cross-call and multi-partition, and they are easy to use. For these two disadvantages, we will take a look at another solution for Kafka: transactions

  • Kafka has also provided support for transactions since version 0.11, and currently primarily does things at the Read Committed isolation level.
  • It ensures that multiple messages are atomically written to the target partition and that the Consumer only sees messages that the transaction successfully committed.
  • Transactional Producer ensures that messages are written atomically to multiple partitions. Either all of these messages are written successfully or all of them fail.
  • Transactional producers are also not afraid of process restarts. After the Producer reboots, Kafka still ensures that the messages they send are processed exactly once.

There is no guarantee that data will not be written to Kafka in the event of a total failure. Kafka does not have a rollback mechanism, so failure must be invisible to the consumer.

Method of use

The producer side

  • As with idempotent Producer, enable. Idempotence = true.
  • Set the transactional. Id parameter on the Producer side to a meaningful name.
  • (The transactional Producer, in contrast to idempotent Producer, is known for calling some transactional API)
producer.initTransactions();
try {
      producer.beginTransaction();
      producer.send(record1);
      producer.send(record2);
      producer.commitTransaction();
} catch (KafkaException e) {
      producer.abortTransaction();
}   
Copy the code

This code ensures that Record1 and Record2 are committed to Kafka as a single transaction, either they all commit successfully or they all fail to write. In fact, even if a write fails, Kafka writes them to the underlying log, meaning that the Consumer still sees the messages.

The consumer end with

  • In fact, even if a write fails, Kafka writes them to the underlying log, meaning that the Consumer still sees the messages.
  • Therefore, on the Consumer side, reading messages sent by a transactional Producer also requires some changes. It can also be easily modified by setting the value of the isolation.level parameter
read_uncommitted
  • This is the default value, indicating that the Consumer can read any message Kafka writes, whether the transactional Producer commits or terminates a transaction. Obviously, if you are using a transactional Producer, then the corresponding Consumer should not use this value.
read_committed
  • Indicates that the Consumer will only read messages if the transactional Producer successfully commits a transaction write. Of course, it also sees all messages written by non-transactional producers.

Realize the principle of

  • Transactional Producer can achieve a set of messages that either all write or all fail. However, how does transactional Producer achieve the message non-repetition on multiple partitions and sessions? The main mechanism is two-phase commit (2PC), and the component of transaction coordinator is introduced to help complete distributed transactions
  • Kafka introduces a new component, Transaction Coordinator, that manages a globally unique Transaction ID and binds the PID of the producer to the Transaction ID. The PID changes when the producer restarts. However, it can still interact with the Transaction Coordinator and retrieve the original PID using the Transaction ID, thus ensuring that the producer can guarantee Exactly Once after the restart.
  • At the same time, a Transaction Coordinator writes Transaction information to an internal Topic in Kafka. Even if the entire Kafka service is restarted, the ongoing Transaction state can be restored because the Transaction state has been persisted to the Topic and continues.

Code demo

Producer code

Here our producer even values, odd values are not submitted

public class TransactionProducer {
    private static final Logger logger = LoggerFactory.getLogger(IdempotenceProducer.class);
    private static KafkaProducer<String, String> producer = null;

    /* Initializes the producer */
    static {
        Properties configs = initConfig();
        producer = new KafkaProducer<String, String>(configs);
    }

    /* Initialize the configuration */
    private static Properties initConfig(a) {
        Properties props = new Properties();
        props.put("bootstrap.servers"."localhost:9092");
        props.put("acks"."all");
        props.put("retries".0);
        props.put("batch.size".16384);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        // Must be set
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1v1_bu_stu_reg");
        props.put("retries".3);
        return props;
    }


    public static void main(String[] args) throws InterruptedException {
        // Message entity
        ProducerRecord<String, String> record = null;
        producer.initTransactions();
        for (int i = 0; i < 10000; i++) {
            record = new ProducerRecord<String, String>("test"."value" + i);

            producer.beginTransaction();
            // Send a message
            producer.send(record);
            producer.send(record);
            if (i %2= =0){
                producer.commitTransaction();
            }else {
                producer.abortTransaction();
            }
            TimeUnit.MILLISECONDS.sleep(1000); } producer.close(); }}Copy the code

Note that in addition to setting the transaction ID and enable transaction, we also need to set the retry times, otherwise we will get the following error

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
	at com.kingcall.clients.producer.transaction.TransactionProducer.<clinit>(TransactionProducer.java:22)
Caused by: org.apache.kafka.common.config.ConfigException: Must set retries to non-zero when using the idempotent producer.
	at org.apache.kafka.clients.producer.KafkaProducer.configureRetries(KafkaProducer.java:545)
	at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:458)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:419)
	... 2 more
Copy the code

Consumer command line demo (consumption failure data)

To demonstrate this, using the command line, if we turn on consumer-specific configuration, we will consume transaction failure messages that have already been written to Kafka

Does that mean that the odd value is not committed? Does that mean that the failed message is not committed to Kafka? No, it is related to kafka’s batch commit. Then the transaction abort, the message in the cache is not sent, so we perform a wait while sending the message, and then we abort the transaction after the producer sends the data. Add one line of code between producers and transaction commit TimeUnit. MILLISECONDS. Sleep (1000); And then let’s see what happens

producer.beginTransaction(); // Producer. Send (record); TimeUnit.MILLISECONDS.sleep(1000); if (i %2 ==0){ producer.commitTransaction(); }else { producer.abortTransaction(); }Copy the code

This time we see the data committed when the transaction fails

Consumer code demo (no consumption of failed data)

We leave the producer code unchanged and then do not consume the failed transaction data by configuring it on the consumer side

public class TransactionConsumer {
    private static KafkaConsumer<String,String> consumer;
    /** * initializes the consumer */
    static {
        Properties configs = initConfig();
        consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Arrays.asList("test"));

    }
    /** * Initialize the configuration */
    private static Properties initConfig(a){
        Properties props = new Properties();
        props.put("bootstrap.servers"."localhost:9092");
        props.put("group.id"."test-group");
        props.put("enable.auto.commit".true);
        props.put("auto.commit.interval.ms".1000);
        props.put("session.timeout.ms".30000);
        props.put("max.poll.records".1000);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        // Only read_COMMITTED data is consumed
        props.put("isolation.level"."read_committed");

        return props;
    }

    public static void main(String[] args) {

        while (true) {
            // This parameter refers to the polling interval, i.e. how long it takes to pull data
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record)->{ System.out.println(record.value()); }); }}}Copy the code

Let’s look at the consumption results

We see that the value consumes the data committed by the transaction

conclusion

  1. Idempotent means that multiple retries have no effect on the data, because the broker has already done it for us, so the consumer doesn’t need to make any changes
  2. A transaction is meant to read data without a transaction failure message, but because the data is already written in, all consumers have to set up not to read this data to ensure that there is no impact on the business and downstream
  3. Both the idempotent producer and the transactional producer can send repeated messages, and the broker has a mechanism for de-duplication (idempotent depends on seq numbers, and transactions depend on markers). The difference is whether the de-duplication is done by the broker
  4. Both equal Producer and transactional Producer are tools that the Kafka community is trying to provide for Kafka to implement precise once-processing semantics, but their scope of action is different.
  5. Idempotent Producer can only guarantee message idempotent on a single partition or a single session. Transactions guarantee idempotency across partitions and sessions. Naturally, transactional producers can do more in terms of delivery semantics.
  6. Transactional Producer performs worse than idempotent Producer. In practice, we need to carefully evaluate the cost of introducing transactions rather than mindlessly enabling them.
  7. For the consumer side, there is still the possibility of repeated consumption or message loss consumption because offsets are committed and messages are processed in a sequential order. To achieve precise consumer consumption, additional mechanisms are needed to implement offsets commit and message consumption transactions on the consumer side
  8. Kafka supports not only cross-talk but also cross-topic transactions, where multiple topics are written simultaneously within a producer