Kafka infrastructure

In the last video we talked about nouns and concepts, but in this video we’re going to take a look at what they are and how to understand them.

Topic (Topic)

  • Topics are logical containers that carry messages and are used to distinguish specific businesses in practice.
  • The logical container here can be thought of as a category of messages, and we put messages of the same class into a Topic

You can use the commands provided by Kafka to see which partitions are in the current Kafka cluster

Partition (Partition)

  • Partition is a characteristic part of Kafka. A Topic can be divided into multiple partitions. Each Partition is an ordered queue, and each message in the Partition has an ordered Offest.
  • A Topic is a logical container, and a Partition is the physical storage of the data in a TopicTopic-NThe N here refers to what partition it is in Topic, and it usually starts at zero.

  • In the same Consumer Group, only one Consumer instance can consume messages for a Partition.
  • In Kafka, each message partition can only be consumed by one Consumer in the same group. Therefore, the relationship between the message partition and the Consumer needs to be recorded on Zookeeper. Once each Consumer determines the consumption right of a message partition, Its Consumer ID needs to be written to the temporary node of the corresponding Zookeeper message partition, for example: /consumers/[group_id]/owners/[topic]/[brokerid-partition_id]

Consumer Group

  • There are multiple consumers under a Consumer group. As the name implies, it is actually a group of consumers. Consumers are organized under one group for easy management
  • For each Consumer Group, Kafka assigns a globally unique Group ID that is shared by all consumers within the Group.
  • Each partition under a subscribed topic can be assigned to only one consumer under a group (although this partition can be assigned to other groups as well). At the same time, Kafka assigns each Consumer a Consumer ID, usually in the form of “Hostname:UUID”.

Broker

Each Broker is an instance of a Kafka service. Multiple brokers form a Kafka cluster. Messages published by producers are stored in the Broker, and consumers pull messages from the Broker for consumption.

A Broker is a server for a Kafka cluster

Kafka persistence

Each Topic divides messages into multiple partitions, and each Partition is an Append log file at the storage level. Any messages published to this Partition are appended directly to the end of the log file. The location of each message in the file is called Offest. The Partition is stored as a file in the file system. Log files are retained for a certain time according to the configuration in the Broker and then deleted to free disk space.

The **”log.dirs”** in the kafka config/server.properties configuration file specifies the directory where the log data is stored

LogSegment

  • Each partition corresponds to a folder on the local disk. Each partition is divided into log segments. Log segments are the smallest unit of Kafka log object fragments
  • LogSegment is a logical concept, corresponding to a specific log file (“.log” data file) and two index files “.index” and “.timeindex”, and the index file of the terminated transaction “.txnindex”, representing the offset index file and the message timestamp index file, respectively.

  • As more data is stored in a partition, the number of.log files increases. The size of the.log file is configurable. The default size is 1 gb, which is specified by parameter log.segment.bytes.
  • One is to improve the efficiency of lookups, and the other is to reduce the amount of data. Kafka’s data needs to be cleaned periodically, so it can be split into smaller files to delete old data files.
  • A string of digits 0 in the figure is the Base Offset of the log segment, that is, the displacement of the first message stored in the log segment.
  • Each partition is a Log object, and each partition directory has multiple groups of Log segments, that is, multiple groups of. Log,. Index, and. Timeindex files with different file names, because each Log segment starts with a different displacement.
Log data file

Kafka stores the message data content sent to it by the producer in a log data file named after the base offset of the segment, left complement 0, with the suffix “.log “.

Each message in the partition is represented by its offset in the partition, which is not the actual location of the message stored in the partition, but a logical value (Kafka records the offset in 8 bytes). However, it uniquely determines the logical location of a Message in a partition, and the offsets of messages in the same partition increase sequentially (this can be compared to the auto-increment primary key of a database).

Since the log file is binary, we can view it using the strings command

Offset index file

If the consumer of the message needs to search for the message with the corresponding offset from the log data file of 1G size (the default value) each fetch, then the efficiency must be very low. After locating the segment, the sequential comparison is required to find the message.

When Kafka designs a data store, to improve the efficiency of finding messages, it uses a sparse index to index each log data file after segmentation. This saves space and can quickly locate the message content in the log data file through the index. The offset index file, like the data file, is also named after the base offset of the segment, with the suffix “.index “.

Index entries are used to map offsets to the actual physical location of messages in the log data file. Each index entry consists of offset and position, and each index entry uniquely identifies a message in each partitioned data file. Kafka uses sparse index storage to create an index every few bytes. You can set the index span by **”log.index.interval.bytes”**. By default, an index entry is added only when at least 4KB of message data is written into a log segment

With the offset index file, Kafka can quickly locate the actual physical location of the message based on the specified offset. This is done by using a dichotomy query to locate the segmented index file and log data file corresponding to the message at the specified offset. Then, through the binary search method, it continues to find the maximum offset less than or equal to the specified offset, and also obtains the corresponding position (actual physical position). According to the physical position, it scans the segmented log data file in order to find the message whose offset is equal to the specified offset.

Timestamp index file

This type of index file is a timestamp based index file introduced by Kafka from version 0.10.1.1. It is named basically the same as the corresponding log data file and offset index file name, with the only difference being the suffix

Each index entry corresponds to an 8-byte timestamp field and a 4-byte offset field. The timestamp field records the maximum timestamp of the LogSegment so far, and the corresponding offset is the offset at which the new message was inserted.

The client

Kafka is mainly written in the Java language, so to provide us with the Java language implementation of the client, at the beginning of this in Kafka also provides some convenient shell scripts, so that we can well to manage Kafka, which includes Kafka producer and consumer client.

producers

A shell script

Java code

public class ProducerMockData {
    private static final Logger logger = LoggerFactory.getLogger(ProducerMockData.class);
    private static KafkaProducer<String, String> producer = null;

    /* Initializes the producer */
    static {
        Properties configs = initConfig();
        producer = new KafkaProducer<String, String>(configs);
    }

    /* Initialize the configuration */
    private static Properties initConfig(a) {
        Properties props = new Properties();
        props.put("bootstrap.servers"."localhost:9092");
        props.put("acks"."all");
        props.put("retries".0);
        props.put("batch.size".16384);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        return props;
    }


    public static void main(String[] args) throws InterruptedException {
        Random random = new Random();
        // Message entity
        ProducerRecord<String, String> record = null;
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        for (int i = 0; i < 10000000; i++) {
            JSONObject obj = new JSONObject();
            obj.put("stu_id", i);
            obj.put("name", RandomStringUtils.randomAlphanumeric(10));
            obj.put("ts", System.currentTimeMillis());
            obj.put("register_time", simpleDateFormat.format(new Date(System.currentTimeMillis()-random.nextInt(1000000))));
            JSONObject detail = new JSONObject();
            detail.put("age", random.nextInt(30));
            detail.put("grade", random.nextInt(5));
            obj.put("detail", detail);
            obj.put("mf"."man,woman".split(",")[random.nextInt(2)]);

            record = new ProducerRecord<String, String>("flink_json_source_4", obj.toJSONString());
            // Send a message
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null! = e) { logger.info("send error"+ e.getMessage()); }}}); } producer.close(); }}Copy the code

consumers