1. Basic introduction to Kafka

Official document Github address

1.1 Development history of Kafka

Apache Kafka was originally developed by LinkedIn and opened source in early 2011.

On October 23, 2012, Apache Incubator (Apache Incubator is the gateway to open source projects that aim to become fully fledged Apache Software Foundation projects) was incubated and became an Apache Software Foundation project.

In November 2014, Jun Rao, Jay Kreps, Neha Narkhede and several other engineers who had worked on Kafka at linkedin started a new company called Confluent with Kafka in mind.

According to a 2014 Quora post, Jay Kreps appears to have named it after Franz Kafka, an Austro-Hungarian german-speaking novelist and jewish short story writer considered by critics to be one of the most influential writers of the 20th century. Kreps chose to name the system after a writer because it’s “Asystem Optimized for Writing” and because he likes Kafka’s work.

1.2. Programming languages

Kafka is written in Scala, which runs on a Java virtual machine and is compatible with existing Java programs, so deploying Kakfa requires a JDK environment first. The Kafka client is supported in all major programming languages.

1.3 Application Scenarios of Kafka

  • Construct a real-time streaming data pipeline that reliably retrieves data between systems or applications. (Equivalent to message queue)
  • Build real-time streaming applications that transform or influence this streaming data. Kafka Stream topic (kafka Stream topic)

1.3.1 Message Queue

Messaging is sending data, and as an alternative to TCPHTTP or RPC, it can be asynchronous, decouple, and peak peaking (it can do everything RabbitMQ and RocketMQ can do). Because kafka has higher throughput, it has an advantage in large-scale messaging systems.

case

  • Release user Activity into the data pipeline, which can be used for monitoring, real-time processing, reports, etc., such as behavior Tracking of social networking sites, shopping sites, so as to achieve more accurate content recommendation. Examples: real-time information on food delivery, logistics, and power systems.

  • Log Aggregation: Another example is Log Aggregation using Kafka. In this way, logs are not logged to the local disk or database, and distributed log aggregation is realized.

  • Metrics: Can also be used to record operational monitoring data. For example, loan companies need to monitor business data on loans: how many loans are issued today, the total amount of loans issued, the age distribution, geographic distribution, gender distribution of users, and so on. You can also monitor the usage of CPU, memory, disks, and network connections for operation and maintenance data.

1.3.2 Streaming Applications (Data integration + Streaming computing)

Data integration: Refers to the import of Kafka data into offline data warehouses such as Hadoop and HBase for data analysis.

Stream computing: What is a Stream? It is not static data, but an endless stream of data that has no boundaries, like a stream of water. Flow calculation refers to

That’s the Stream pair doing real-time calculations. Kafka Streams has been built into the Kafka Streams framework API since version 0.10.

Therefore, it is positioned quite differently from RabbitMQ, not only as a simple messaging middleware, but also as a stream processing platform. In Kafka, messages are called logs. The log is the data file of the message.

2. Kafka installation and server operation

2.1 installation of Kafka

Note: kafka_2.12-2.8.0. TGZ is the version number of Scala. Kafka is the version number of kafka.

  • Standalone installation
  • Cluster Installation (single-node)

2.2 kafka and ZooKeeper

When installing Kafka, we know that we have to rely on zooKeeper services. In production environments, it is usually a ZK cluster, and Kafka also comes with a ZooKeeper service.

What does ZooKeeper do?

Zookeeper’s ordered nodes, temporary nodes, and listening mechanisms help Kafka do this

  • Configuration center (managing information for brokers, topics, partitions, and consumers, including metadata changes)
  • Load balancing
  • The naming service
  • Distributed notification
  • Cluster management and elections
  • A distributed lock

2.3 introduction to Kafka script

The script on the client side is written in Java, which essentially executes Java methods (kafka-run-class.sh), further encapsulated, and requires more parameters

Less.

The script role
kafka-server-start.sh

kafka-server-stop.sh
Kafka starts and stops
kafka-topics.sh View create delete topic
kafka-console-consumer.sh Consumer actions, such as listening topic
kafka-consumer-groups.sh Consumer Group operation
kafka-console-producer.sh Producer operations, such as sending messages
zookeeper-server-start.sh ZK operation: Start and stop the zK connection
kafka-reassign-partitions.sh Partition reallocation
kafka-consumer-perf-test.sh The performance test

2.4 Kafka interface management tool

Kafka does not come with an admin interface, but an admin-based interface can be developed. At present, the popular management interface is mainly

  • kafka-manager

  • kafka-eagle

Note The latest VERSION of CMAK has high requirements on the Java version. The minimum requirement is JDK11.

Kafka-eagle has high memory requirements, and JVM parameters need to be modified to start kafka-Eagle deployment in a virtual machine.

3. Kafka Architecture

3.1, the Broker

A Kafka machine is a Broker. A cluster consists of multiple brokers. A single Broker can hold multiple topics.

3.2, the Message

Data transferred between clients is called a message, or Record (noun [‘reko:d]). In client code, Record can be a KV key-value pair.

The producer’s wrapper class is ProducerRecord and the consumer’s wrapper class is ConsumerRecord. The message needs to be serialized in transit, so the code

You specify the serialization tool. The format of the message stored on the server (RecordBatch and Record) :

Kafka.apache.org/documentati…

3.3, Producer

The sender of the message is called a producer, and the receiver is called a consumer. To speed up message sending, producers send messages to brokers in batches rather than individually

To send. How many messages are sent at a time is determined by a single parameter.

prop.put("batch.size".16384);
Copy the code

3.4, Consumer

Generally speaking, there are two modes for consumers to get messages, one is pull mode and the other is push mode.

The Pull pattern is where the consumption is placed in the Broker and the consumer decides when to fetch it. In the Push pattern, messages are placed in the Consumer and pushed directly to the Consumer whenever messages arrive at the Broker.

RabbitMQ Consumer supports both push and pull, usually push. Kafka only has pull mode.

Why does Kafka allow consumers to use pull to consume data?

The official website makes it very clear:

Kafka.apache.org/documentati…

In push mode, if the rate at which messages are generated is much faster than the rate at which consumers consume them, the consumer will be overwhelmed (you can’t eat any more, but still not eat

Shoving it in your mouth) until it dies. And consumers can control how many messages they get at a time, through max.poll.record.

3.5, the Topic

You can think of it as a queue. Topics categorize messages, and producers and consumers are oriented to the same Topic. The relationship between producer and Topic and Topic and consumer is

Many to many. A producer can send messages to more than one Topic, and a consumer can get messages from more than one Topic (but this is not recommended).

Note that when a producer sends a message, a Topic is automatically created if it does not exist. By a parameter auto. Create. Switchable viewer. Enable = true to control, by default

True. To delete a Topic completely, this parameter must be set to false, otherwise it will be created automatically whenever code uses the Topic.

Kafka.apache.org/documentati…

3.6, Partition

Partition means Partition. To achieve scalability and concurrency, a very large Topic can be distributed across multiple brokers (servers), and a Topic can be divided into multiple partitions, each of which is an ordered queue. Partitions are specified when creating a topic, and each topic has at least one partition.

Create Topic command

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Copy the code

Partitions are the specified number of partitions. If this is not specified, the default is 1. This parameter can be modified with num.partitons=1.

Replication-factor is the number of copies.

Partition is similar to partition table in idea, which is also used for horizontal expansion and load.

For example, a Topic has three partitions, and producers send nine messages in turn, numbering them.

The first partition stores 147, the second, 258, and the third, 369, which implements the load.

Each partition has a physical directory. In the configured data directory (logs are data) : / TMP /kafka-logs/

Unlike RabbitMQ, messages to a Partition are not deleted after being read, so the same messages are written sequentially to the same Partition. This is also an important reason for Kafka’s high throughput. How to choose the number of partitions? Is the more partitions the better? Not necessarily. Different machine network environments,

The answer varies, and it’s best to check with a script for performance testing.

How do I determine the number of partitions?

Reference blog:

Os.51cto.com/art/202008/…

www.cnblogs.com/xiaodf/p/60…

3.7, up

Replica means copy. Replica serves as a copy of a Partition, so it makes a backup to prevent data loss caused by network or hardware faults. Each partition can have several replicas, which must be located on different brokers. Generally speaking, the replica includes the master node.

Replication-factor specifies the number of copies for a Topic.

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test3p3r
Copy the code

Default copy number by the service side parameter offsets. The topic. The replication. The control factor.

For example, three brokers are deployed and the topic has three partitions with a total of three copies per partition. The following figure

Note: These copies of the partition that store the same data have the concepts of leader(red) and follower(green).

The leader machine is not necessarily elected. Producers send messages and consumers read messages to the leader(why not let clients read followers?). . The data on the followers is synchronized from the leader.

3.8, the Segment

Kafka data is stored in a.log file. If a partition has only one log file, messages are continuously added to the log file and the log file becomes larger and larger, making it inefficient to retrieve data. A partition is divided into segments. Kafka’s storage files are actually stored in segments.

The default storage path is/TMP /kafka-logs/

Each segment has at least one data file and two index files, which appear in sets. Partition a directory, a directory

Seinterfaces set of documents)

The default segment size is 1073741824 bytes, which is controlled by the log.segment.bytes parameter.

3.9, Consumer Group

Multiple consumers consuming a Topic increases the throughput of consuming messages.

A Consumer Group is a Consumer Group. Multiple consumers form a Consumer Group by setting the same Group ID. The consumption of a Topic by a Consumer Group is mutually exclusive, and a partition can only be consumed by one Consumer in the Group.

If you want to consume messages from the same partition at the same time, you need other groups to consume them.

3.10, Consumer Offset

Offset: the code used to record where a Consumer consumes a message. If the consumer hangs up and reconnects, it passes

Offset knows where the previous consumption went. This also ensures the orderliness of messages for the same partition.

Offset records the sequence number of the next message to be sent to the consumer.

The offset between the consumer and the partition is not stored in ZK, but directly on the server.

3.11 Interpretation of architecture diagram

First, we have three brokers.

There are two topics: Topic0 and Topic1.

Topic0 has two partitions: partition0 and partition1, with a total of three copies for each partition. Topic1 has only one partition: partition0, with a total of three copies per partition. In the figure, the red font is the leader, and the black font is the follower. The green line represents data synchronization. The blue lines are write messages and the orange lines are read messages, both for the leader node. There are two consumer groups. The first consumer group has only one consumer and consumes the two partitions of Topic0. The second consumer group consumes both topIC0 and topic1. There is one consumer, consuming topic0 partition0, and consuming topic1 partition0. There is one consumer, consuming part 1 of part 0. There is one consumer and no partition to consume.

4. Kafka and Java development

4.1 and Java API

Kafka has five main apis:

category role
Producer API Topics used by applications to send data to Kafka
Consumer API Used by applications to read data streams from Kafka’s topics
Admin APl Allows you to manage and detect topics, brokers, and other Instances of Kafka, similar to the scripting commands that come with Kafka
Streams API For converting data flow from source topic to destination topic, same as Spark. Storm and Flink (application)
Connect APl Used to continuously input data from some source system to Kafka, or push data from Kafka to some system such as database or Hadoop etc. (storage)

4.. 1.1. Introduce dependencies

Note that the client version matches the server version. If the client version is too high and the server version is too old, the connection may fail.

<dependency>
    <groupId>groupld>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
    <version>server</version>
</dependency>
Copy the code

4.1.2 Producers

public class  SimpleProducer {
    public static void main(String[] args) {
        Properties pros=new Properties();
        pros.put("bootstrap.servers"."192.168.0.101:9092192168 0.102:9092192168 0.103:9092");

        pros.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        pros.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        / / 0 a go confirmation | 1 leader fell plate | all (1) after the confirmation of all followers synchronization
        pros.put("acks"."1");
        // Number of automatic retry times
        pros.put("retries".3);
        // How many pieces of data to send once, default 16K
        pros.put("batch.size".16384);
        // Batch send wait time
        pros.put("linger.ms".5);
        // Client buffer size, default 32M, full will trigger message sending
        pros.put("buffer.memory".33554432);
        // The producer blocks the metadata, and throws an exception after timeout
        pros.put("max.block.ms".3000);

        // Create Sender thread
        Producer<String,String> producer = new KafkaProducer<String,String>(pros);

        for (int i =0; i<10; i++) { producer.send(new ProducerRecord<String,String>("mytopic",Integer.toString(i),Integer.toString(i)));
             System.out.println("Send."+i);
        }

// producer.send(new ProducerRecord
      
       ("mytopic","1","1"));
      ,string>
        //producer.send(new ProducerRecord<String,String>("mytopic","2","2"));producer.close(); }}Copy the code

4.1.3 Consumers

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props= new Properties();
        props.put("bootstrap.servers"."192.168.0.101:9092192168 0.102:9092192168 0.103:9092");
        props.put("group.id"."gp-test-group");
        // Whether to commit the offset automatically. The offset of the consumer group is updated only after the commit
        props.put("enable.auto.commit"."true");
        // The interval between automatic consumer submissions
        props.put("auto.commit.interval.ms"."1000");
        / / data from the earliest start spending earliest | latest | none
        props.put("auto.offset.reset"."earliest");
        props.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
        / / subscribe to the topic
        consumer.subscribe(Arrays.asList("mytopic"));

        try {
            while (true){
                ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String,String> record:records){
                    System.out.printf("offset = %d ,key =%s, value= %s, partition= %s%n",record.offset(),record.key(),record.value(),record.partition()); }}}finally{ consumer.close(); }}}Copy the code

4.2 Springboot integrates Kafka

Version Mapping:

Spring. IO/projects/sp…

4.2.1 Introduce dependencies

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
Copy the code

4.2.2, configuration,

application.properties

# spring. Kafka. The bootstrap - the servers = 192.168.0.101:9092192168 0.102:9092192168 0.103:9092

# producer
spring.kafka.producer.retries=1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=all
spring.kafka.producer.properties.linger.ms=5
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Copy the code

4.2.3 Producers

The injected template method KafkaTemplate sends messages.

Note that the SEND method has a lot of overloads. The asynchronous callback ListenableFuture.

@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    public String send(@RequestParam String msg){
        kafkaTemplate.send("springboottopic", msg);
        return "ok"; }}Copy the code

4.2.4 Consumers

Listen on the Topic with the @kafkalistener annotation.

@Component
public class ConsumerListener {
    @KafkaListener(topics = "springboottopic",groupId = "springboottopic-group")
    public void onMessage(String msg){
        System.out.println("---- received a message:"+msg+"--"); }}Copy the code

4.3 Kafka and Canal realize data synchronization

www.cnblogs.com/throwable/p…

5. Kafka idempotent messages

5.1 Definition and Use

Message idempotency: Messages sent by Producer to the Broker are de-duplicated, and the Broker filters duplicate messages.

This can be configured using the Producer API

props.put("enable.idempotence".true);
Copy the code

5.2. Principle analysis

How are Kafka idempotent messages implemented?

You’ve probably seen idempotent interfaces, and if you want to keep an interface idempotent, it’s easy to put a unique identifier on every request. Kafka’s ideas are also types. In order to realize the idempotence of Producer, Kafka introduces Producer ID (PID) and Sequence Number.

  • PID. Each new Producer is assigned a unique PID when initializing, which is not visible to the user.
  • For each PID, each

    that the Producer sends data corresponds to a monotonically increasing Sequence Number starting from 0.

Therefore, the idempotency of messages can be guaranteed by PID and Sequence Number.

Message repeat judgment logic?

  • If the sequence number of the new message is exactly the -> sequence number is 1 greater, the broker will accept the message.
  • If the sequence number of a new message is smaller than the sequence number maintained by the broker, the message is repeated and the broker can discard it
  • If new message sequence number than broker side maintenance sequence number greater than 1, that exist among the lost data, so will respond to this situation, the corresponding Producer will throw OutOfOrderSequenceException.

Note: Message idempotence via PID and sequence numbers only ensures that messages do not repeat within a Partition.

6. Producer affairs

6.1 transaction introduction and usage

Producer transactions are a feature that Kafka introduced in 0.11.0.0 in 2017. Kafka uses transactions to ensure messages are sent idempotent across Producer sessions.

A Producer transaction ensures that a Producer sends multiple messages that either succeed or fail. (Here messages may be sent to multiple partitions or topics)

Transactions use Java apis

  	   props.put("enable.idempotence".true);
        // Transaction ID, unique
        props.put("transactional.id", UUID.randomUUID().toString());

        Producer<String,String> producer = new KafkaProducer<String,String>(props);

        // Initialize the transaction
        producer.initTransactions();

        try {
            producer.beginTransaction();
            producer.send(new ProducerRecord<String,String>("transaction-test"."1"."1"));
            producer.send(new ProducerRecord<String,String>("transaction-test"."2"."2"));
            // Integer i = 1/0;
            producer.send(new ProducerRecord<String,String>("transaction-test"."3"."3"));
            // Commit the transaction
            producer.commitTransaction();
        } catch (KafkaException e) {
            // Abort the transaction
            producer.abortTransaction();
        }

        producer.close();
    }
Copy the code

Transactions are used in SpringBoot

Specific usage reference blog: www.hangge.com/blog/cache/…

Method 1: You can do this through executeInTransaction

 kafkaTemplate.executeInTransaction(operations -> {
            operations.send("topic1"."test executeInTransaction");
            throw new RuntimeException("fail");
        });
Copy the code

Method 2: Use the @Transactional annotation

6.2. Analysis on the principle of Producer transaction

  1. Because producer messages may cross partitions, the transaction is a distributed transaction. There are many ways to implement distributed transactions, and Kafka chooses the most common two-phase commit (2PC). If everyone can commit, commit, otherwise abort.
  2. Since it is 2PC, there must be a Coordinator role called Transaction Coordinator.
  3. Transaction management must have transaction logs to record the status of transactions so that coordinators can continue to process the original transactions after a plug-in is suspended. As with the store of consumer offsets, Kafka uses a special topic_transaction_state to record the transaction state.
  4. If the producer dies, the transaction must be restarted to resume processing, and the transaction must have a unique ID. This is transaction.id, where we use UUID. If transaction.id is configured, enable.idempotence is set to true(idempotent is a prerequisite for transaction implementation). Producers with the same transaction ID can then process the original transaction.

Step Description:

A: A producer registers A transaction ID with A Coordinator through the initTransactions API.

B: the Coordinator records transaction logs.

C: The producer writes the message to the destination partition.

D: Interaction between a partition and a Coordinator. When the transaction completes, the status of the message should be committed so that the consumer can consume it.

7. Main features of Kafka

There are so many MQ products out there, how is Kafka different from them? We say that the background of a product determines its features, and features determine usage scenarios.

Because Kafka is designed to solve the problem of transferring data streams, it has these features:

  • High throughput and low latency: Kakfa is best known for its speed in sending and receiving messages. Kafka can process hundreds of thousands of messages per second with a latency of just a few milliseconds.
  • High scalability: If the capacity can be expanded by adding partitions. Different partitions can be in different brokers. ZK manages Broker implementation extensions, and ZK manages Consumer implementation loads;
  • Persistence and reliability: Kafka allows data to be stored persistently, messages to be persisted to disk, and data backup to prevent data loss.
  • Fault tolerance: Allows the Kafka cluster to work properly when a node fails or a node goes down.
  • High concurrency: Thousands of clients can read and write data simultaneously.

Kafka vs. RabbitMQ

The main differences between Kafka and RabbitMQ:

Kafka RabbitMQ
Focusing on the product Streaming message processing, message engines The message broker
performance Kafka has higher throughput, only pull mode RabbitMQ is mainly push
Message order The messages in the partition are ordered, an elimination in the same consumer group

A subscriber can consume only one partition
Order of messages is guaranteed
Routing and distribution of messages Does not support The RabbitMQ support
Delayed message, dead letter queue Does not support The RabbitMQ support
Retention of messages Messages are retained after consumption, so you can set Retention to clean up messages. It will be deleted as soon as it is consumed

Preference for RabbitMQ

  • Advanced flexible routing rules
  • Message timing control (controlling message expiration or message delay)
  • Advanced fault tolerance in situations where consumers are more likely to process messages unsuccessfully (instantaneous or persistent)
  • Simpler consumer implementation

Kafka is preferred:

  • Strict message order
  • Extend message retention, including the possibility of replaying past messages
  • High scalability that traditional solutions cannot meet.