In this paper, starting from vivo Internet technology WeChat public mp.weixin.qq.com/s/bV8AhqAjQ number… About the author: Zheng Zhibin, graduated from computer Science and Technology (bilingual class) of South China University of Technology. He has been engaged in e-commerce, open platform, mobile browser, recommendation advertising, big data, artificial intelligence and other related development and architecture. At present, I am engaged in AI construction and advertisement recommendation business in Vivo intelligent platform center. Good at business architecture, platform and business solutions of various business forms. Blog: Arganzheng.life.

background

Recently, the original centralized log monitoring system was migrated. The original implementation scheme is as follows: Log Agent => Log Server => Kibana. The path between Log Agent and Log Server is Thrift RPC, which implements a simple load balancing (WRB).

In fact, the original scheme runs well, and the asynchronous Agent has no impact on the application performance. There is no pressure at all to support our application of tens of millions of PV a day. However, there is a disadvantage that if the error Log is swollen, the Log Server can not handle it, resulting in message loss. Of course we are not up to this magnitude, and can also be handled by introducing queue buffers. But all things considered, it’s actually simpler to use message queues directly. PRC, load balancing, load buffering are all built in. Another approach is to read the log directly, similar to the Logstash or Flume approach. However, we decided to use message queues for flexibility, since we already have Zookeeper deployed. After some research, Kafka is the best choice for data transfer and buffering. Log Agent => Kafka => ElasticSearch => Kibana

Kafka is introduced

Basic concepts of Kafka

  • Broker: A Kafka cluster contains one or more servers, which are called brokers.
  • Topic: Every message published to a Kafka cluster has a category called Topic.
  • Message
    • A message is the basic unit of Kafka communication. It consists of a fixed-length header and a variable length payload. In the Java client, it is also called Record.
    • The message structure is described as follows:
      • CRC32: CRC32 checksum, 4 bytes.
      • Magic: Kafka protocol version, used for compatibility. 1 byte.
      • Attributes: This field contains 1 byte. The lower two bits indicate the compression method, the third bit indicates the timestamp type (0 indicates LogCreateTime, 1 indicates LogAppendTime), and the higher four bits are reserved and have no immediate meaning.
      • Timestamp: indicates the timestamp of the messagemagic > 0The message header must contain this field. Eight bytes.
      • Key-length: indicates the key length of the message, which is 4 bytes.
      • Key: actual message key data.
      • Payload-length: indicates the actual data length of a message, which is 4 bytes.
      • Payload: Actual data of the message
    • There is also 12 bytes of extra LogOverhead to actually store a message:
      • Message offset: 8 bytes, similar to the message Id.
      • Total message length: 4 bytes
  • Partition:
    • A Partition is a physical concept, and each Topic contains one or more partitions.
    • Each partition is an ordered queue consisting of an ordered series of immutable messages.
    • Each partition corresponds to a folder physically and is named as follows${topicName}-{partitionId}, such as__consumer_offsets-0.
    • The partition directory stores log segments of the partition, including log data files and two index files.
    • Each message is appended to the corresponding partition and is written sequentially to disk, which is very efficient. This is an important guarantee of Kafka’s high throughput rate.
    • Kafka only guarantees the ordering of messages within a partition, not across partitions.
  • LogSegment:
    • Log files are divided into one or more log segments by size or time. The LogSegment size depends on the configuration itemlog.segment.bytesSpecifies that the default is 1GB. The length of time is based onlog.roll.msorlog.roll.hoursConfiguration item setting; The currently active log segment is called an active segment (activeSegment).
    • Unlike normal log files, Kafka log segments have two secondary index files in addition to a specific log file:
      • The data file
        • Data files are based on.logIs a message set file (FileMessageSet) with a file suffix, used to store the actual message data
        • The name is: offset from the first message of the data file, also known as the baseline offset (BaseOffset), left complement 0 to form a 20-digit character
        • The base offset of each data file is that of the previous data fileLEO+1(The first data file is 0)
      • Offset index file
        • The file name is the same as the data file, but in.indexIs the suffix name. Its purpose is to quickly locate the message based on the offset.
        • First Kafka sets each log segment toBaseOffsetSave to one for keyConcurrentSkipListMapSkip table, so that when looking for a message with a specified offset, the binary lookup method can be used to quickly locate the data file and index file where the message is located
        • Then, binary search is performed in the index file, and the search value is less than or equal to the maximum offset of the specified offset. Finally, the data file is scanned sequentially from the maximum offset until the message whose offset is equal to the specified offset is queried in the data file
        • It should be noted that not every message should be indexed. Instead, sparse storage is adopted to build an index every certain byte of data, which can be passedindex.interval.bytesSet the index span.
      • Timestamp index file
        • Kafka has introduced a timestamp based index file with the same file name as the data file, starting with version 0.10.1.1.timeindexAs a suffix. Its function is to solve the problem of quickly locating the message location according to the timestamp.
        • The Kafka API provides oneOffsetsForTimes (Map<TopicPartition, Long> timestampsToSearch)Method, which returns the offset and timestamp of the first message whose timestamp is greater than or equal to the time to be queried. This function actually works well, assuming we want to start spending from a certain period of timeoffsetsForTimes()Method locates the offset of the first message nearest this time and callsseek(TopicPartition, long offset)Method moves the consumer offset over and then callspoll()Method long polls for pull messages.
  • Producer:
    • Responsible for publishing messages to Kafka Broker.
    • Some important configuration items for producers:
      • request.required.acksKafka provides producers with three acknowledgement mechanisms (ACKS) that configure brokers to send acknowledgement messages to producers after receiving a message, so that producers can act accordinglyrequest.required.acksThe value can be 0, -1, or 1. The default value is 1.
        • Acks =0: The producer does not need to wait for the broker to return an acknowledgement message and sends messages continuously.
        • Acks =1: the producer needs to wait until the Leader copy has successfully written the message to the log file. This approach reduces the possibility of data loss to some extent, but there is still no guarantee that data will not be lost. There is no waiting for the follower copy to complete synchronization.
        • Acks =-1: an acknowledgement message is sent to the producer only when the Leader replica and all replicas in the ISR list have completed data storage. To prevent data loss, ensure that the number of synchronized copies is greater than or equal to 1min.insync.replicasSet to throw an exception when the number of synchronization copies is insufficient. However, this approach also affects the speed and throughput of messages sent by the producer.
      • message.send.max.retries: Number of retries by the producer before abandoning the message. The default is 3.
      • retry.backoff.ms: Time to wait before each retry. The unit is ms. The default value is 100.
      • queue.buffering.max.ms: In asynchronous mode, the maximum time for messages to be cached. When this time is reached, messages are sent in batches. If the maximum number of cached data is configured in asynchronous modebatch.num.messagesIf either of the thresholds is reached, messages will be sent in batches. The default value is 1000ms.
      • queue.buffering.max.messages: Maximum number of unsent messages that can be cached in a queue in asynchronous mode. The default is 10000.
      • queue.enqueue.timeout.ms:
        • = 0: indicates that the queue is directly added to the queue when it is not full. If the queue is full, the queue is discarded immediately
        • < 0: indicates that the block is unconditional and not discarded
        • > 0: indicates that the block is thrown when the value is reachedQueueFullExceptionabnormal
      • batch.num.messagesKafka supports sending messages to a specific partition of the broker. The size of the Batch is determined by the attributebatch.num.messagesIndicates the maximum number of messages that can be sent in batches at a time. If the producer sends messages in synchronous mode, changing the configuration item is invalid. The default is 200.
      • request.timeout.ms: Timeout for the producer to wait for the broker to reply when acks are needed. The default value is 1500ms.
      • send.buffer.bytes: Indicates the size of the Socket send buffer. The default is 100KB.
      • topic.metadata.refresh.interval.ms: Interval at which producers regularly request updates to topic metadata. If set to 0, data updates are requested after each message is sent. The default value is 5 minutes.
      • client.id: producer ID, which is used by services to track calls and locate problems. The default isconsole-producer.
  • Consumer & Consumer Group & Group Coordinator:
    • Consumer: Message Consumer, the client that reads messages to Kafka Broker. Kafka0.9 releases a new consumer based on Java rewrite that no longer relies on the scala runtime environment and zookeeper.
    • Consumer Group: Each Consumer belongs to a specific Consumer Groupgroup.idIf the group name is not specified, the group name is specified by defaulttest-consumer-group.
    • Group Coordinator: For each Consumer Group, a broker is selected as the Consumer Group Coordinator.
    • Each consumer also has a globally unique ID, which is available through the configuration itemclient.idIf not specified, Kafka automatically generates a format for the consumer${groupId}-${hostName}-${timestamp}-${UUIDThe globally unique ID of.
    • Kafka provides two ways to submit consumer_offset: Kafka automatically commits or a client manually commits by calling the corresponding KafkaConsumer API.
      • Automatic commit: Not periodic commit, but detect whether the time between the last commit and some specific event has elapsedauto.commit.interval.ms.
        • enable.auto.commit=true
        • auto.commit.interval.ms
      • Manual submission
        • enable.auto.commit=false
        • commitSync(): Synchronous commit
        • commitAsync(): Asynchronous commit
    • Some important configuration items for consumers:
      • group.id: A unique string that identifies the consumer group this consumer belongs to.
      • client.id: The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.
      • bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
      • key.deserializer: Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface.
      • value.deserializer: Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface.
      • fetch.min.bytes: The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
      • fetch.max.bytes: The maximum amount of data the server should return for a fetch request.
      • max.partition.fetch.bytes: The maximum amount of data per-partition the server will return.
      • max.poll.records: The maximum number of records returned in a single call to poll().
      • heartbeat.interval.ms: The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
      • Session.timeout. ms: The timeout used to detect consumer failures when using Kafka’s group management facility.
      • Enable.auto.com MIT: If true the consumer’s offset will be truly committed in the background.
  • ISR: Kafka dynamically maintains an ISR(In-Sync Replica) In ZK, which is a list of synchronized replicas that hold brokerids corresponding to all replicas that keep messages synchronized with the Leader Replica. If a copy goes down or falls too far behind, the follower copy is removed from the ISR list.
  • Zookeeper:
    • Kafka uses ZK to store metadata information, including broker information, Kafka cluster information, old consumer information and consumption offset information, topic information, partition status information, partition copy sharding scheme information, dynamic configuration information, and so on.
    • Kafka register nodes in zK
      • /consumers: A node of consumers is created under this node of the ZK after the old consumers are started
      • /brokers/ seqID: Auxiliary generated brokerId when the user is not configuredbroker.idWhen, ZK automatically generates a globally unique ID.
      • /brokers/topics: Every time a topic is created a node with the same name as the topic is created in that directory.
      • /borkers/ids: Every time Kafka starts a KafkaServer a directory named KafkaServer is created in this directory{broker.id}The child nodes of the
      • /config/topics: Stores dynamically modified topic level configuration information
      • /config/clients: stores dynamically modified client-level configuration information
      • /config/changes: stores corresponding information when dynamically changing the configuration
      • /admin/delete_topics: Saves the information about the topic to be deleted when the topic is deleted
      • /cluster/id: saves the cluster ID
      • / Controller: Saves the brokerId information for the controller, etc
      • /isr_change_notification: Saves the path to be notified when the Kafka copy ISR list changes
    • When Kafka is up or running, nodes are created on the ZK to store metadata information. Listeners are registered on these nodes to listen for metadata changes.

TIPS

If matched with ES, Broker corresponds to Node, Topic corresponds to Index, Message corresponds to Document, and Partition corresponds to shard. LogSegment Segment relative to ES.

Dump Log Segments

When working with Kafka, we sometimes need to view various information about the messages we produce. These messages are stored in Kafka log files. Because of the special format of the log file, we cannot directly view the information content in the log file. Kafka provides a command to dump binary segmented log files into character files:

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments
Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt logSegment. Option Description ------ ----------- --deep-iteration uses deep iteration instead of shallow iteration --files <file1, file2,... > required. Input log segment file, comma-separated --key-decoder-class custom key value deserializer. The 'kafka. Serializer.Decoder' trait must be implemented. The jar package needs to be placed in the 'kafka/libs' directory. (the default is ` kafka. Serializer. StringDecoder `). --max-message-size <Integer: size> Maximum number of bytes for the message (default: 5242880) --print-data-log Prints out log messages at the same time --value-decoder-class Custom value deserializer. The 'kafka. Serializer.Decoder' trait must be implemented. The jar package needs to be placed in the 'kafka/libs' directory. (the default is ` kafka. Serializer. StringDecoder `). --verify-index-only only validates the index and does not print the index contentCopy the code

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log 
Dumping /tmp/kafka-logs/test-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 CreateTime: 1498104812192 isvalid: true payloadsize: 11 magic: 1 compresscodec: NONE crc: 3271928089 payload: hello world
offset: 1 position: 45 CreateTime: 1498104813269 isvalid: true payloadsize: 14 magic: 1 compresscodec: NONE crc: 242183772 payload: hello everyone
Copy the code


Note: Here
--print-data-logIs used to view the message content. If you do not add this parameter, you can see only headers, but not payload.

Can also be used to view index files:

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index  --print-data-log 
Dumping /tmp/kafka-logs/test-0/00000000000000000000.index
offset: 0 position: 0
Copy the code


The timeindex file is also OK:

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.timeindex  --print-data-log 
Dumping /tmp/kafka-logs/test-0/00000000000000000000.timeindex
timestamp: 1498104813269 offset: 1
Found timestamp mismatch in :/tmp/kafka-logs/test-0/00000000000000000000.timeindex
  Index timestamp: 0, log timestamp: 1498104812192
Found out of order timestamp in :/tmp/kafka-logs/test-0/00000000000000000000.timeindex
  Index timestamp: 0, Previously indexed timestamp: 1498104813269
Copy the code


Consumer balancing process

Consumer Rebalance is the process of rejoining consumers and reassigning regions to them. Consumer balancing operation will be caused under the following circumstances:

  • New consumers join the consumer group

  • Current consumer exits from consumer group (either abnormal exit or normal shutdown)

  • The consumer unsubscribes to a topic

  • The number of partitions in Kafka can be dynamically increased but cannot be decreased.

  • A new coordinator is elected

  • When the consumer has not sent a heartbeat request within ${session.timeout.ms}, the group coordinator considers the consumer to have quit.

Consumer auto balancing provides high availability and scalability for consumers, so that when we increase or decrease the number of consumers or partitions, we do not need to care about the distribution relationship between the underlying consumers and partitions. Be aware, however, that in a rebalancing process, the consumer cannot pull a message for a short period of time due to the fact that it is required to reallocate the partition.

NOTES

Special attention should be paid to the last category, the so-called Slow Consumers. If no heartbeat request is received within session.timeout.ms, the coordinator can remove the slow consumer from the group. Typically, if message processing is slower than session.timeout.ms, you become a slow consumer. Results in a longer interval between calls to the two poll() methods than session.timeout.ms. Since the heartbeat is only sent when the poll() call is made (in 0.10.1.0, the client heartbeat is sent asynchronously in the background), this causes the coordinator to flag the slow consumer as dead.

If no heartbeat request is received within session.timeout.ms, the coordinator flags the consumer dead and disconnects from it. At the same time, trigger the rebalance operation by sending an IllegalGeneration error code to the HeartbeatResponse of other consumers in the group.

Pay special attention to this problem in manual commit offset mode, otherwise the commit will not be performed. That leads to repeated consumption.

Second, Kafka’s characteristics

  1. Message order: The order is guaranteed within each partition, but not globally across partitions. A topic can have only one partition if global message ordering is required.

  2. Consumer Group: Consumers in a consumer group concurrently get messages, but only one consumer consumes each partition to ensure that the messages are sequenced. Therefore, the number of consumers in a consumer group must be smaller than or equal to the number of partitions in topic. (To order global messages, only one partition and one consumer can exist)

  3. A message on the same Topic can only be consumed by one Consumer within the same Consumer Group, but multiple Consumer groups can consume the message at the same time. This is what Kafka uses to broadcast a Topic message (to all consumers) and unicast (to a single Consumer). A Topic can correspond to multiple Consumer groups. If you want to implement broadcasting, you can just have a separate Group for each Consumer. To implement unicast, all consumers need to be in the same Group.

  4. Producer Push messages, Client Pull messages: Some logging-centric systems, such as Facebook’s Scribe and Cloudera’s Flume, use Push messages. In fact, both push and pull modes have their strengths and weaknesses. The push pattern is difficult to accommodate consumers with different consumption rates, because the message sending rate is determined by the broker. The goal of the push mode is to deliver messages as quickly as possible, but this can easily cause consumers to fail to process messages, typically through denial of service and network congestion. The Pull pattern consumes messages at an appropriate rate based on the Consumer’s ability to consume. The Pull pattern simplifies broker design and allows consumers to control the rate at which messages are consumed, as well as the way they consume them — either in bulk or on a piece-by-piece basis — and to choose different delivery methods to implement different transport semantics.

In fact, one of Kafka’s design philosophies is to provide both offline and real-time processing. Based on this feature, messages can be processed online in real time using a real-time Streaming system such as Storm or Spark Streaming, while being processed offline using a batch system such as Hadoop, and data can be backed up to another data center in real time. Just make sure that the consumers used for these three operations belong to different Consumer groups.

Kafka’s HA

In versions prior to 0.8, Kafka did not provide a High Availablity mechanism whereby once one or more brokers were broken, all partitions on them would be unable to continue to be serviced during the break. If the Broker can never be recovered, or if the disk fails, the data on it will be lost. One of the design goals of Kafka is to provide data persistence, and for distributed systems, especially when the cluster size increases to a certain extent, the possibility of one or more machines down greatly increases, requiring Failover. As a result, Kafka has a High Availability mechanism since 0.8. It is mainly manifested in Data Replication and Leader Election.

Data Replication

Kafka provides partition level replication starting from 0.8. The number of replicas can be found in the

$KAFKA_HOME/config/server. The properties in the configuration:

default.replication.factor = 1
Copy the code

This Replication provides an automatic failover mechanism in conjunction with the Leader Election. Replication has some impact on Kafka throughput, but greatly enhances availability. By default, the number of Replication in Kafka is 1. Each partition has a unique leader from which the followers pull data in batches. All read and write operations are performed. In general, the number of partitions is greater than or equal to the number of brokers, and the leaders of all partitions are evenly distributed among brokers. The logs on the follower are exactly the same as those on its leader.

Note that replication factor does not affect the consumer throughput test because the consumer reads data only from the leader of each partition, not from the Replicaiton factor. Similarly, consumer throughput is independent of synchronous or asynchronous replication.

Leader Election

After Replication is introduced, the same Partition may have multiple replicas. In this case, a Leader needs to be selected among the replicas. The Producer and Consumer only interact with the Leader Replica. Other replicas act as followers and copy data from the Leader. Note that only the Leader is responsible for data read and write, and the followers only Fetch data sequentially to the Leader (N paths) without providing any read and write services, making the system simpler and more efficient.

Consider why follower copies do not provide read and write and only do cold standby?

The fact that follwer replicas do not provide write services makes sense, because if followers also provide write services, then all replicas need to be synchronized with each other. N copies need NXN channels to synchronize data. If asynchronous synchronization is adopted, data consistency and order are difficult to ensure. However, if data is synchronized, the write delay is actually magnified by n times, which is counterproductive.

So why not have the follower copy provide read services to reduce the read pressure on the leader copy? Unlike other storage services (ES, MySQL), Kafka reads are essentially ordered message consumption, depending on an offset called offset, which is stored. If multiple copies are read load balanced, this offset is difficult to determine.

TIPS

The leader replica of Kafka is similar to the Primary shard of ES, and the follower replica is similar to the replica of ES. ES is also an index with multiple shards (as opposed to Kafka, where a topic has multiple partitions). Shards are divided into primary shards and replicition shards. Shard = hash(routing) % number_of_primary_shards But ES introduces the role of coordinating node, which is transparent to clients. Replication shard only provides read services (here, like Kafka, ES waits for relication shard to return successfully before finally returning to client).

For those of you who have experience with traditional MySQL database and table partitioning, this process is very similar to a sharding + Replication data structure, which is only transparent to you through a client(SDK) or coordinator.

Propagate the message

When publishing messages to a Partition, Producer first finds the Leader of the Partition through ZooKeeper. Then, no matter how many replicas the Replication Factor of the Topic is (that is, how many replicas the Partition has), Producer sends the message only to the Leader of the Partition. The Leader writes the message to its local Log. Each Follower pulls data from the Leader. In this way, the followers store data in the same order as the Leader. The Follower sends an ACK to the Leader after receiving the message and writing its Log. Once the Leader receives an ACK from all replicas in the IN-Sync Replicas (ISR), the message is considered a commit. The Leader adds high-watermark (HW) and sends an ACK to the Producer.

To improve performance, each Follower sends an ACK to the Leader as soon as it receives data, rather than waiting until the data is written to the Log. Therefore, Kafka can only ensure that a committed message is stored in the memory of multiple replicas, but not persisted to disk. Therefore, it cannot be completely guaranteed that the message will be consumed by consumers after an exception occurs. But given how rare this scenario is, you can argue that this approach strikes a good balance between performance and data persistence. In future releases, Kafka will consider providing higher persistence.

The Consumer reads messages from the Leader, and only messages that have been committed (messages whose offset is lower than HW) are exposed to the Consumer.

The data flow for Kafka Replication is shown below:

The content about this aspect is more and more complicated, I will not expand here, this article is well written, interested students can learn

Kafka High Availability (Part 1)

Kafka several cursors (offsets/offsets)

The following diagram shows all of Kafka’s cursors in a very straightforward way

(rongxinblog.wordpress.com/2016/07/29/…). :

Here’s a brief explanation:

0, ISR

An in-sync Replicas list, as the name suggests, is a Replicas that “saves synchronization” with the leader. The meaning of “keep in sync” is somewhat complicated. In version 0.9, the broker parameter replica.lag.time.max.ms is used to define the ISR. If the follower does not fetch to the leader’s log end offset, the leader will remove the follower from the ISR. The ISR is an important indicator. When the controller selects the leader Replica of a partition, it is used. The leader needs to maintain the ISR list.

In a scenario where the leader needs to be elected, the Leader and ISR are determined by the Controller. After the leader is elected, the ISR is determined by the leader. If the leader and ISR only exist on the ZK, then each broker needs to listen on Zookeeper for changes in the Leader and ISR of each partition of its host, which is inefficient. If you don’t put it on Zookeeper, then when the Controller fails, you need to retrieve this information from all brokers, which is not feasible considering the problems that may occur during this process. So the leader and ISR information exists on Zookeeper, but when changing the Leader, the Controller will make the change on Zookeeper first and then send the LeaderAndIsrRequest to the relevant broker. In this way, it is more efficient to include all partitions that have changed on the broker in a LeaderAndIsrRequest, i.e. batch changes of new information to the broker. In addition, when the leader changes the ISR, the changes are made on Zookeeper first, and then the ISR in the local memory is modified.

1. Last Commited Offset

The location of the last Consumer submission, which is saved in a special topic: _consumer_offsets.

2, the Current Position

The position that Consumer is currently reading but has not yet committed to the broker. After the Commit, it becomes the Last Commit Offset.

3, High Watermark (HW)

The offset is the minimum LEO across all the ISR’s LEO of this partition, and the consumer cannot read messages that exceed HW. This means that messages that are not fully synchronized (and therefore not fully backed up) are being read. In other words: HW is the message that all nodes in the ISR have been copied. It is also the maximum offset of messages available to the consumer (note that not all replicas necessarily have these messages, but only those in the ISR).

The HW is constantly changing as the follower’s pull progress changes in real time. The follower always asks the leader for the data starting from the next offset of the messages they already have. Therefore, when the follower sends A fetch request for the data whose offset is above A, The leader knows that the log end offset of the follower is at least A. At this point, we can make statistics on whether the LEO of all replicas in ISR is greater than HW. If so, we should increase HW. At the same time, when the leader fetches a local message to a follower, it also carries its own HW in the reponse returned to the follower. This way the followers know the HW at the leader. (In the implementation, however, the follower only gets the HW from reading the leader’s local log, not the latest HW.) However, the HW of the leader and follower is not synchronized, and the HW recorded by the follower may lag behind that of the leader.

Hight Watermark Checkpoint

Since the HW is constantly changing, there are efficiency issues if it is updated to Zookeeper in real time. Because HW is so important that it needs to be persisted, ReplicaManager starts a separate thread to periodically record the HW values of all partitions to the file, which is called highwatermark-checkpoint.

4, Log End Offset(LEO)

This is easily understood as the current most recent log write (or sync) location.

Kafka client

Kafka supports JVM languages (Java, Scala), as well as high-performance C/C++ clients, and librdKafka encapsulation based on a variety of language clients. For example, Python client: confluent-kafka-python. The Python client also has a pure Python implementation: kafka-python.

Here is a Python example (take Confluent-kafka-python as an example) :

Producer:

from confluent_kafka import Producer
 
p = Producer({'bootstrap.servers': 'mybroker,mybroker2'})
for data in some_data_source:
    p.produce('mytopic', data.encode('utf-8'))
p.flush()
Copy the code


Consumer:

from confluent_kafka import Consumer, KafkaError
 
c = Consumer({'bootstrap.servers': 'mybroker'.'group.id': 'mygroup'.'default.topic.config': {'auto.offset.reset': 'smallest'}})
c.subscribe(['mytopic'])
running = True
while running:
    msg = c.poll()
    if not msg.error():
        print('Received message: %s' % msg.value().decode('utf-8'))
    elifmsg.error().code() ! = KafkaError._PARTITION_EOF:print(msg.error())
        running = False
c.close()


Copy the code


This is basically the same as normal message queue usage.

Kafka offset management

Kafka reads messages based on offset. If offset is wrong, messages can be read repeatedly or unread messages can be skipped. Prior to 0.8.2, Kafka stored offsets in ZooKeeper, but we know that zK writes are expensive and don’t scale linearly, and frequent zK writes can lead to performance bottlenecks. So 0.8.2 introduced Offset Management, which is stored in a compacted kafka topic(_consumer_offsets) The Consumer commits offsets by sending OffsetCommitRequest requests to the specified broker (offset manager). The request contains a series of partitions and the consumption locations (offsets) within those partitions. The offset manager appends key-value messages to a specific topic (__consumer_offsets). The key is made up of the consumergroup-topic-partition and the value is the offset. Also, for performance purposes, a recent record is maintained in memory so that OffsetFetchRequests can be sent quickly if a key is specified without scanning the entire offset topic log. If the offset manager fails for some reason, the new broker will become the offset manager and regenerate the offset cache by scanning the offset topic.

How do I view the consumption offset

Kafka prior to version 0.9 provided the kafka-consumer-offset-checker.sh script, which can be used to check the consumer offset of a consumer group for one or more topics. This script calls the kafka-consumer-offset-checker

Kafka. Tools. Consumer. OffsetChecker. After version 0.9 is no longer suggest use the script, but it is recommended to use kafka – consumer – groups. Sh script, the script is invoked kafka. Admin. ConsumerGroupCommand. This script actually manages the consumer group, not just looking at the offset of the consumer group. Only the latest kafka-consumer-groups.sh script is used here.

With the ConsumerGroupCommand tool, we can use list, describe, or Delete consumer groups.

For example, to list all consumer group information in all topics, use the list argument:

$ bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list
 
test-consumer-group
Copy the code


To see the current consumption offset of a consumption group, use the describe parameter:

$ bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group
 
GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
test-consumer-group            test-foo                       0          1               3               2               consumer-1_/127.0.0.1
Copy the code


NOTES

This script can only delete consumer groups that do not include any consumer groups, and can only delete consumer groups corresponding to the old version of the consumer group (that is, group metadata stored in ZooKeeper is valid), because the essence of this script deletion operation is to delete the corresponding consumer group node and its children in ZK.

How do I manage the consumption offset

This section describes how to query Kafka consumption offsets using script tools. In fact, we can also query the consumption offset through the API.

The Kafka consumer API provides two methods for querying operations on consumer consumption offsets:

  1. Committed (TopicPartition partition): This method returns an OffsetAndMetadata object through which to obtain the committed offsets of the specified partition.

  2. Position (TopicPartition partition): This method returns the position of the next pull position.

In addition to viewing consumption offsets, sometimes we need to specify offsets artificially, such as skipping certain messages or redo certain messages. Before 0.8.2, offset was stored in ZK, and ZK could be operated with ZKCli. But after 0.8.2, offset is stored in kafka’s __consumer_offsets queue by default and can only be changed through the API:

Class KafkaConsumer

Kafka allows specifying the position using seek(TopicPartition, long) to specify the new position. Special methods for seeking to the earliest and latest offset the server maintains All of the available (seekToBeginning (TopicPartition…). And seekToEnd (TopicPartition…). respectively).
,v>

Kafka Consumer Offset Management

The Kafka consumer API provides a way to reset the consumption offset:

  1. Seek (TopicPartition partition, long offset): This method is used to reset the starting position of consumption to the specified offset position.

  2. SeekToBeginning (): Consuming from the beginning of the message, corresponding to the offset reset policy

    Auto. Offset. Reset = earliest.

  3. SeekToEnd (): Consume from the position corresponding to the latest message, that is, wait for new messages to be written before pulling. The offset reset strategy is

    Auto. Offset. Reset = latest.

Of course, you need to know the position of the offset to be reset. One way is to get the offset based on the timestamp. Seek again.

Deployment and Configuration

Kafka is written in Scala, so running it is very simple once the JRE environment is installed. Download the official compiled package directly, decompress the configuration can run directly.

Kafka configuration

The configuration file is in the config directory of server.properties. The key configurations are as follows (some properties do not exist in the configuration file by default, and you need to add them yourself) :

Broker.id: Each machine (called the broker) in a Kafka cluster needs a unique id port: listening port delete.topic.enable: set totrueBytes: Indicates the maximum size of a message. The default value is 1000012(1M). You are advised to set the value to 10000012(10M). Replica.fetch. Max. Bytes: The default value is 1048576. You are advised to set it to 10048576. Log. dirs: Kafka data file directory, note not log files. Can be configured to: / home/work/kafka/data/kafka - logs the cleanup. The policy: expired data elimination, defaults to delete, can also be set to compact the retention. Hours: Data expiration time (hours). The default value is 1073741824, that is, one week. Expired data is cleared using the rule log.cleanup.policy. Log.retention. Minutes can be configured to the minute level. Log.segment. Bytes: specifies the size of a data file to be split. The default value is 1073741824(1G). Retention. Check. Interval. Ms: clean up the interval of thread to check whether the data is expired, the unit for ms, the default is 300000, 5 minutes. Zookeeper. connect: Indicates the host name of the ZooKeeper cluster that manages Kafka. Multiple zookeeper cluster names are separated by commas (,)Copy the code


TIPS Send and receive large messages

Modify the following parameters:

  • Broker: message. Max. Bytes

    & replica.fetch.max.bytes

  • Consumer: fetch. Message. Max. Bytes

See the official documentation for more details of the parameters:

Kafka.apache.org/documentati…

ZK configuration and startup

Then make sure ZK is configured and started correctly. Kafka with ZK services, configuration file in config/zookeeper. The properties files, critical configuration is as follows:

dataDir=/home/work/kafka/data/zookeeper
clientPort=2181
maxClientCnxns=0
tickTime=2000
initLimit=10
syncLimit=5
server.1=nj03-bdg-kg-offline-01.nj03:2888:3888
server.2=nj03-bdg-kg-offline-02.nj03:2888:3888
server.3=nj03-bdg-kg-offline-03.nj03:2888:3888
Copy the code


NOTES Zookeeper cluster deployment

ZK cluster deployment does two things:

  1. Allocate the serverId: Create a myID file in the dataDir directory that contains only a number from 1 to 255. This is the ZK serverId.

  2. Configure the cluster: The format is server.{id}={host}:{port}:{port}, where {id} is the serverId of the ZK mentioned above.

Then start: bin/zookeeper – server – start. Sh – daemon config/zookeeper. Properties.

Start Kafka

Kafka: JMX_PORT=8999 bin/kafka-server-start.sh -daemon config/server.properties

TIPS

We added the JMX_PORT=8999 environment variable to the startup command to expose JMX monitoring items for easy monitoring.

Kafka monitoring and management

Unlike RabbitMQ or ActiveMQ, Kafka doesn’t have a web interface by default, just command-line statements. This isn’t very convenient, but you can install one, such as Yahoo’s Kafka Manager: A tool for managing Apache Kafka It supports many features:

  • Manage multiple clusters

  • Easy inspection of cluster state (topics, consumers, offsets, brokers, replica distribution, partition distribution)

  • Run preferred replica election

  • Generate partition assignments with option to select brokers to use

  • Run reassignment of partition (based on generated assignments)

  • Create a topic with optional topic configs (0.8.1.1 has different configs than 0.8.2+)

  • Delete topic (only supported on 0.8.2+ and remember set Delete. Topic. Enable =true in broker config)

  • Topic list now indicates topics marked for deletion (only supported on 0.8.2+)

  • Batch generate partition assignments for multiple topics with option to select brokers to use

  • Batch run reassignment of partition for multiple topics

  • Add partitions to existing topic

  • Update config for existing topic

  • Optionally enable JMX polling for broker level and topic level metrics.

  • Optionally filter out consumers that do not have ids/ owners/ & offsets/ directories in zookeeper.

The installation process is quite simple, it takes a long time to download a lot of things. For details, see Kafka Manager Installation. However, these management platforms do not have permission management functions.

Note that kafka-manager.zkhosts in the Kafka Manager conf/application.conf configuration file is configured for its own high availability, not for zkhosts pointed to by the Kafka cluster to be managed. So don’t forget to manually configure the Kafka cluster information (mainly configuration names, and ZK addresses) to manage. Install and Evaluation of Yahoo’s Kafka Manager.

Kafka Manager mainly provides an administrative interface, and monitoring depends on other applications, such as:

  1. Burrow: Kafka Consumer Lag Checking. Linkedin’s Cusumer log monitor, written in the go language, seems to have no interface, only HTTP API, can configure email alarm.

  2. Kafka Offset Monitor: A little app to monitor the progress of kafka consumers and their lag wrt the queue.

The purpose of both applications is to monitor Kafka’s offsets.

Delete the topic

You can delete a Kafka theme in either of the following ways:

1. Manually delete the topic partition folder in ${log.dir} of each node and log in to ZK client to delete the node corresponding to the topic to be deleted. The topic metadata is saved in /brokers/ Topics and /config/ Topics.

2. Run the kafka-topics. Sh script to delete the topic completely. If you want to use the script to delete the topic completely, ensure that the delete.topic. Otherwise, instead of actually deleting the topic, execute the script and create a topic with the same name as the topic to be deleted in the /admin/delete_topics directory of ZK, marking the topic as being deleted.

Kafka-topic — delete — Zookeeper server-1:2181,server-2:2181 — topic test ‘

Execution Result:

Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Copy the code


If you want to delete the topic completely, you need to manually delete the corresponding files and nodes. When the configuration item is true, all files, directories and metadata related to the topic are deleted.

Expired data is automatically cleared

For a traditional Message Queue, messages that have already been consumed are deleted, whereas the Kafka cluster keeps all messages, whether they have been consumed or not. Of course, due to disk limitations, it is impossible (and not necessary) to keep all data forever, so Kafka offers two strategies for deleting old data. It is based on the time and partition file size. $KAFKA_HOME/config/server can be configured. The properties, let Kafka delete data from a week ago, can also be configured to Kafka in partition file more than 1 gb delete old data:

############################# Log Retention Policy #############################
 
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
 
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
 
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
 
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
 
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
 
# By default the log cleaner is disabled and the log retention policy will default to
# just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs
# can then be marked for log compaction.
log.cleaner.enable=false
Copy the code


Note that because Kafka takes O(1) to read a particular message, deleting a file is independent of Kafka’s performance. The deletion policy you choose depends only on your disk and your requirements.

Some problems with Kafka

1. The order of messages in a single partition of a single topic is guaranteed, but the order of messages in all partitions of a single topic is not guaranteed. If your application is strict about message ordering, Kafka may not be appropriate.

2. The consumption offset is tracked and committed by the consumer, but the consumer does not often write this offset to Kafka because it is expensive for the broker to maintain these updates, which can lead to messages being consumed multiple times or not consumed at all in exceptional cases.

A message may have been consumed, but the consumer will hang before confirming that the message has been consumed with the broker’s commit offset, and then another consumer will start working on the same partition, starting with the last committed offset, causing some messages to be consumed again. On the other hand, if the consumer commits the offset before processing the message, but hangs while processing the message, that part of the message is “lost.” In general, processing messages and committing offsets hardly constitute an atomic operation, so there is no guarantee that all messages will be processed exactly once.

3. The number of topics and partitions is limited

A Kafka cluster is limited in the number of topics it can handle, and performance begins to degrade around 1000 topics. These questions are basically related to basic implementation decisions for Kafka. In particular, the amount of random IO on the broker increases dramatically as the number of topics increases, because each topic partition write is actually a separate file append operation. As the number of partitions increases, the problem gets worse. If Kafka does not take over IO scheduling, the problem is difficult to solve.

Of course, most applications do not require such a large number of topics and partitions. However, if a single Kafka cluster is used as a multi-tenant resource, this problem is exposed at this point.

4. Manually balance the load of partitions

Kafka’s model is very simple. A theme partition is all held on one broker, and there may be several brokers that serve as replicas of the partition. The same partition does not split storage between multiple machines. As partitions continue to grow, some machines in the cluster are unlucky enough to be allocated just a few large partitions. Kafka has no mechanism to automatically migrate these partitions, so you have to do it yourself. Monitoring disk space, diagnosing which partition is causing the problem, and then determining a suitable place to migrate the partition are manual administrative tasks that cannot be ignored in a Kafka cluster environment.

If the cluster size is small and the space required for the data is small, this management approach works just fine. However, if traffic increases rapidly or there is no first-class system administrator, the situation is completely out of control.

Note: If new nodes are added to the cluster, data must be manually migrated to these new nodes. Kafka does not automatically migrate partitions to balance load or storage space.

5. Follow copy (replica) only serves as cold backup (to solve THE HA problem) and cannot provide read service

Unlike ES, Replica Shard also provides read services to relieve the read pressure on the master. Kafka because the read service is stateful (commited offset is maintained), the follow copy does not participate in the read/write service. Just as a cold backup, solve a single point of problem.

6. Messages can only be consumed sequentially, not randomly positioned, which makes it inconvenient to quickly locate problems when problems occur

This is a common problem for all messaging systems as asynchronous RPCS. Suppose the sender sends a message, but the consumer says I didn’t receive it, so how do I check? Message queues lack a mechanism for randomly accessing messages, such as retrieving messages based on their key. This makes it difficult to detect such problems.

Recommended reading

  1. Centralized Logging Solutions Overview

  2. Logging and Aggregation at Quora

  3. The application of ELK in advertising system monitoring and Elasticsearch

  4. Centralized Logging

  5. Centralized Logging Architecture

For more content, please pay attention to vivo Internet technology wechat public account

Note: To reprint the article, please contact our wechat account: LABs2020.