This is the fifth day of my participation in the August Wen Challenge.More challenges in August

Kafka advanced features

producers

Data production process analysis

  • When Producer creates, it creates a Sender thread and sets it as a daemon thread
  • The output message is an asynchronous process. The output message passes through the interceptor, serializer, and divider, and is then stored in the RecordAccumulator buffer
  • Batch sending conditions are as follows: The buffer data size reaches batch.size or Lingering.ms reaches the upper limit
  • After a batch is sent, it is sent to the specified partition and sent to the broker. If the producer sets the REtries parameter to greater than 0 and allows retries after a failure, the client internally retries the message.
  • Drop disk to broker successfully, return production metadata to producer.
  • Metadata is returned in two ways: a blocking direct return and a callback return.

The broker configuration

attribute instructions
bootstrap.servers A list of broker addresses that producers need to connect to a cluster of brokers. Do not write all, but do not write just one
key.serializer Implements the interface org.apache.kafka.com mon. Serialization. The Serializer key serialization class
value.serializer Implements the interface org.apache.kafka.com mon. Serialization. Value the serialization of the Serializer class
acks Controls the persistence of sent messages, default 1

Acks =0: The producer does not wait for any message from the broker to be sent as long as it has been placed in the socket buffer. The retries setting does not work. The client does not care if the message fails to be sent. The offset received is always -1

Acks =1: the leader writes the record to the local log and responds to the client information.

Acks =all/-1: the leader waits for confirmation messages of all synchronized replicas and does not lose messages as long as one synchronized replica exists
compression.type Compressed format for producer-generated data, default none, allowed values: None, gzip, SNappy, and LZ4
retries Set the value of greater than 1, will fail in the message is sent to send messages, allowed to try again but I don’t set the Max. In the flight. Requests. Per. The connection is 1, there is news out-of-order, optional value [0, 2147483647]
retry.backoff.ms The time to wait between retries when resending a message to a specified topic partition. For example, after three retries, wait for this length of time after each retry, and then try again. In some failure scenarios, dense loops of resend requests are avoided. Value of long, default 100. Optional values: [0,…
request.timeout.ms Maximum length of time a client waits for a request response. If the server response times out, the request is resended unless the number of retries is reached. This setting should be larger than replica.lag.time.max.ms (whether the LEO of the replica is not less than the HW of the leader replica, follow lags leader10s) to avoid resending messages during the server delay time. Int Value, default: 30000, optional: [0,…
interceptor.classes The interceptor class must implement org. Apache. Kafka. Clients. Producer. ProducerInterceptor interface
batch.size Controls the size of the default batch in bytes. The request sent to the broker will contain multiple batches, one per partition, and data that can be sent.
client.id The string of ids passed to the broker when the producer sends a request.
send.buffer.bytes Size of the buffer (SO_SNDBUF) used by TCP to send data. If set to 0, the operating system default is used.
buffer.memory The total memory bytes that producers can use to cache records waiting to be sent to the server. If the record is sent faster than the record can be sent to the server, the producer blocks Max. Block. ms for the time after which it throws an exception. This setting should correspond roughly to the total memory that the producer will use, but not all of the memory used by the producer is used for buffering. Some of the extra memory is used for compression (if compression is enabled) and for maintaining requests in the run. Long data. Default value: 33554432 Optional value: [0,…
max.block.ms Control KafkaProducer. The send () and KafkaProducer partitionsFor () block length. These methods block when the cache is full or metadata is not available. Block times in user-supplied serializers and partitioners are not counted. Long value, default: 60000, optional: [0,…
connections.max.idle.ms When the connection idle time reaches this value, the connection is closed. Long data, default: 540000
linger.ms The default is 0 (no delay). If linger. Ms =5, wait 5ms before a request is sent. Long value, default: 0, optional: [0,…
max.request.size Maximum number of bytes for a single request. This setting limits the number of messages in a message batch in a single request so that a single request does not send too much data. The server has its own Settings to limit the batch size, which may be different from this configuration. Int value. Default value: 1048576 Optional values: [0,…
partitioner.class The default org. Apache. Kafka. Clients. Producer. The internals. DefaultPartitioner
receive.buffer.bytes TCP receive cache (SO_RCVBUF). If this parameter is set to -1, the default value of the operating system is used. Int Value. The default value is 32768. Optional values: [-1…
security.protocol Protocol used to communicate with the broker: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL String value. Default: PLAINTEXT
max.in.flight.requests.per Maximum number of unacknowledged requests on a single connection. At this amount, the client blocks. If the value is greater than 1 and there are failed requests, message order cannot be guaranteed during retries. Int value. The default value is 5. Optional values: [1,…
reconnect.backoff.max.ms For each successive connection failure, the retreat for each host is multiplied until this maximum is reached. After calculating retreat increments, add 20% random jitter to avoid connection storms. Long value, default 1000, optional: [0,…]
reconnect.backoff.ms Attempted to reconnect the base wait time of the specified host. Intensive reconnection to the host is avoided. This fallback time applies to all connections from the client to the broker. Value of long, default 50. Optional values: [0,…

Serialization configuration

Kafka use org.apache.kafka.com mon. Serialization. Serializer data serialization into a byte array, the system provides the ByteArraySerializer, ByteBufferSerializer, BytesSerializer, DoubleSerializer FloatSerializer, IntegerSerializer StringSerializer, LongSerializer, ShortSerializer. Serialization of data is commonly used in production using avro, a custom serializer that implements the serialize method.

public class UserSerializer implements Serializer<User> {
    @Override
    public void configure(Map<String, ? > configs,boolean isKey) {
        // do nothing
        // used to receive configuration parameters for the serializer and to configure and initialize the current serializer
    }

    @Override
    public byte[] serialize(String topic, User data) {
        try {
            if (data == null) {
                return null;
            } else {
                final Integer userId = data.getUserId();
                final String username = data.getUsername();

                if(userId ! =null) {
                    if(username ! =null) {
                        final byte[] bytes = username.getBytes("UTF-8");
                        int length = bytes.length;
                        // The first four bytes store the value of the userId
                        // The second 4 bytes are used to store the length of the username byte array int value
                        // The third length is used to store the serialized byte array of username
                        ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
                        / / set the userId
                        buffer.putInt(userId);
                        // Set the length of the username byte array
                        buffer.putInt(length);
                        // Set the username byte array
                        buffer.put(bytes);
                        // Returns the value of the user object as a byte array
                        returnbuffer.array(); }}}}catch (Exception e) {
            throw new SerializationException("Data serialization failed");
        }
        return null;
    }

    @Override
    public void close(a) {
        // do nothing
        // Used to close resources. It needs to be idempotent, that is, called multiple times, and the effect is the same.}}Copy the code

Partition is

In KafkaProceducer and DefaultPatitioner, the default partition computes:

  • If a partition number is provided, use the supplied one
  • If no partition number is provided, the number of partitions is modulated using the hash value of the serialized value of key
  • If record does not provide a partition number and does not provide a key, use polling to assign a partition number
    • The partition number is first assigned in the availability zone
    • If there are no available partitions, partition numbers are assigned to all partitions

Custom partition:

  • Develop the Partitioner interface implementation classes
  • Set config. The put (” partitioner. Class “, “xx. The class”);
public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // The partition number can be calculated here.
        return 2;
    }

    @Override
    public void close(a) {}@Override
    public void configure(Map
       
         configs)
       ,> {}}Copy the code

The interceptor

The Producer and Consumer Interceptor are used for customized control logic on the Client side. The Interceptor may run in multiple threads, so it needs to ensure thread safety when implementing it. Interceptor chains catch exceptions to log files without passing them up.

public interface ProducerInterceptor<K.V> extends Configurable {
    // encapsulates the kafkaproducer.send method, which runs on the user's main thread
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    // It is called after a message is answered or fails to be sent. Usually, it runs in the I/O thread of Producer before the Proceducer callback logic is triggered. Therefore, heavy logic cannot be added to the I/O thread, affecting the sending efficiency
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
    // Close the Interceptor and clean up the resource
    public void close(a);
}
Copy the code

Custom interceptors:

  • The ProceducerInterceptor interface is implemented
  • Set config. The put (ProducerConfig INTERCEPROR_CLASSES_CONFIG, “xx. Xx. Class, the class”);
public class InterceptorOne implements ProducerInterceptor<Integer.String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);

    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        System.out.println("Interceptor 1 -- Go");


        // Call this method when a message is sent through the interceptor

        // The content of the message to send
        final String topic = record.topic();
        final Integer partition = record.partition();
        final Integer key = record.key();
        final String value = record.value();
        final Long timestamp = record.timestamp();
        final Headers headers = record.headers();


        // A new message is created from the original message after the interceptor blocks it
        // No changes have been made to the original message
        ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
                topic,
                partition,
                timestamp,
                key,
                value,
                headers
        );
        // Pass a new message
        return newRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("Interceptor 1 -- Back");
        // This method is called when a message is acknowledged or an exception occurs. Heavy tasks should not be implemented in this method
        // Kafka producer performance will be affected.
    }

    @Override
    public void close(a) {}@Override
    public void configure(Map
       
         configs)
       ,> {
        final Object classContent = configs.get("classContent"); System.out.println(classContent); }}Copy the code

The principle of

KafkaProducer has two basic threads

  • Main thread: Responsible for message creation, interceptor, serializer, divider, etc., and appending messages to the message collector RecordAccumulator.
    • RecordAccumulator maintains a double-ended queue of type Deque for each partition
    • ProducerBatch Batch sending improves throughput and reduces network impact
    • Because the producer client uses java.io.ByteBuffer to save messages before sending them, and maintains a BufferPool to implement the reuse of ByteBuffer; This buffer pool is managed only for bytebuffers of a specific size (specified by batch.size) and cannot be reused for caches that are too large.
    • Each time a ProducerRecord message is added, it will find/create the corresponding two-end queue, get a ProducerBatch from its tail, and judge whether the size of the current message can be written into the batch. Write if it can; If the message cannot be written, create a ProducerBatch and check whether the message size exceeds the value set by the client parameter batch.size. If the message size does not exceed the value set by the client parameter batch.size, create a new ProducerBatch to facilitate cache reuse. If yes, the ProducerBatch is created based on the calculated message size. The disadvantage is that this memory cannot be reused.
  • The Sender thread:
    • The cached messages are taken from the message collector and processed as
    • After the

      , data can be sent to the server
      ,>
    • Before sending, the Sender thread saves the message in the form of Map

      to InFlightRequests, which can be used to obtain LeastLoadedNodes, which are the least overloaded of the current nodes. To achieve the message as soon as possible.
      ,>

consumers

Consumer groups

Multiple consumers from the same topic join a consumer group, and the consumers in the consumer group share group.id

The Kafka heartbeat is a health check between the Consumer and the Broker. The Consumer will send messages only if the Broker Coordinator is healthy

parameter field
session.timeout.ms MemberMetadata.sessionTimeoutMs
max.poll.interval.ms MemberMetadata.rebalanceTimeoutMs

Broker sessionTimeoutMs parameter. Broker logic ina group coordinator. If the heartbeat is delayed, the broker can remove the consumer from the group and rebalance it

Consumer sessionTimeoutMs rebalanceTimeoutMs parameter. If a client detects that a heartbeat is overdue, the client will flag the coordinator as unavailable and block the heartbeat thread. If the poll message interval exceeds the rebalanceTimeoutMs, the consumer will tell the broker to leave the consumer group and trigger the rebalance.

The message received

parameter instructions
bootstrap.servers List of hosts/ports used to establish initial connections to the Kafka cluster
key.deserializer Org.apache.kafka.com mon. Serialization. Deserializer implementation class
value.deserializer Org.apache.kafka.com mon. Serialization. Deserializer implementation class
client.id The ID string sent to the server when a message is consumed from the server
group.id A string used to uniquely identify the consumer group to which the current consumer belongs. This must be set if consumers use group management features such as SUBSCRIBE (topic) or kafka-based offset management policies.
auto.offset.reset Earliest: Automatically resets the offset to the earliest

Latest: Automatically resets the offset to the latest offset

None: Discard to consumer if previous offset of consumer group does not exist

often

Anything: Throw exceptions at consumers
enable.auto.commit If set to true, the consumer automatically periodically submits offsets to the server.
auto.commit.interval.ms If the value of enable.auto.mit is set to true, this value defines how often the consumer offset is submitted to Kafka.
fetch.min.bytes Minimum amount of data returned by the server for each pull message request.
fetch.max.wait.ms If the amount of data on the server does not reach fetch.min.bytes, the server cannot immediately respond to the request. This parameter is used to set the maximum duration for blocking requests on the server.
fetch.max.bytes The maximum amount of data returned by the server for a single pull request. The consumer pulls messages in batches, and if the value of the first non-empty message batch is greater than that value, the message batch is returned so that the consumer can continue. That is, the configuration is not the absolute maximum value. The maximum number of message batches that the broker can receive is specified by message.max.bytes (broker configuration) or max.message.bytes (topic configuration).
connections.max.idle.ms Close idle connections after this time.
check.crcs Automatically calculates the CRC32 check value of the consumed message. You can ensure that messages are not corrupted in transit or in disk storage. It adds extra load and is disabled for extreme performance.
exclude.internal.topics Whether internal themes should be exposed to consumers. If this entry is set to true, you can only subscribe before pulling.
isolation.level Controls how transaction messages are read; Read_committed and read_uncommitted (default)
heartbeat.interval.ms When using a consumer group, this entry specifies the interval at which the consumer sends the heartbeat to the consumer coordinator.
session.timeout.ms When using Kafka’s consumer groups, consumers periodically send heartbeat numbers to the broker to indicate their presence. If no consumer heartbeat is received after this timeout, the broker removes the consumer from the consumer group and starts rebalancing. This value must be in the broker configuration group. Min. Session. A timeout. Ms and group. The Max. Session. A timeout. Ms.
max.poll.records The maximum number of records returned by a call to the poll() method.
max.poll.interval.ms The interval at which the poll() method is called when consuming groups are used. This entry specifies the maximum interval between consumer calls to the poll() method. If the consumer does not invoke the poll() method within this time, the broker considers the consumer to have failed, triggers a rebalancing, and allocates the partition to the other consumers in the consumer group.
max.partition.fetch.bytes The maximum number returned by the server for each partition. Consumers pull data by batch. If the first record of a non-empty partition is greater than this value, the batch can still be returned to ensure that the consumer can proceed. Bytes (the broker parameter) or max.message.bytes (the topic parameter) specify the size of the batch that the broker receives.
send.buffer.bytes This parameter specifies the buffer size (SO_SNDBUF) used by TCP to send data. -1 indicates the default buffer size of the OS
retry.backoff.ms If a retry is required in the event of a failure, this configuration indicates how long the client waits before initiating a retry. The existence of this time avoids intensive cycles.
request.timeout.ms Maximum time for a client to wait for a response from a server. If this time times out, the client either reinitiates the request, or if the retries run out, the request fails.
reconnect.backoff.ms Wait time for reconnecting to the host. Avoid the dense cycle of reconnection. This wait time applies to all connections from the client to the broker.
reconnect.backoff.max.ms The maximum amount of time, in milliseconds, to wait to reconnect to a broker that repeatedly failed to connect. If this option is provided, for each successive connection failure, the fallback for each host is multiplied until this maximum is reached. After calculating retreat increments, add 20% random jitter to avoid connection storms.
receive.buffer.bytes Cache of data received by TCP connections (SO_RCVBUF). -1 indicates that the default value of the OPERATING system is used.
partition.assignment.strategy The class name of the partition allocation policy when using consumer groups.
metrics.sample.window.ms Calculate the time window of the indicator sample.
metrics.recording.level The highest record level of an indicator.
metrics.num.samples The number of samples maintained for calculating metrics
interceptor.classes A list of interceptor classes. The default no interceptor interceptor is consumer interceptor, the interceptor needs to implement org. Apache. Kafka. Clients. Consumer. ConsumerInterceptor interface. Interceptors can be used to intercept messages received by consumers.
  • deserialization

    Implement org.apache.kafka.com mon. Serialization. Deserializer, system provides the ByteArrayDeserializer, ByteBufferDeserializer, ByteDeserializer, DoubleDeserializer, FloatDeserializer, IntegerDeserializer, LongDeserializer, ShortDeserializer, StringDeserializer etc.

    Custom deserialization

    public class UserDeserializer implements Deserializer<User> {
        @Override
        public void configure(Map<String, ? > configs,boolean isKey) {}@Override
        public User deserialize(String topic, byte[] data) {
            ByteBuffer buffer = ByteBuffer.allocate(data.length);
    
            buffer.put(data);
            buffer.flip();
    
            final int userId = buffer.getInt();
            final int usernameLength = buffer.getInt();
    
            String username = new String(data, 8, usernameLength);
    
            return new User(userId, username);
        }
    
        @Override
        public void close(a) {}}Copy the code
  • Displacement to submit

    • Automatically submit

      • enable.auto.commit=true
      • Auto.com MIT. Interval. Ms 5 s by default
      • Kafka makes sure that when polling is called, all the messages returned from the previous poll will not be lost, but will consume the data repeatedly (rebalance every 5s, rebalance on 3s).
    • Synchronization to submit

      • KafkaConsumer#commitSync
      • Affect the TPS
      • You can extend the submission interval, causing consumers to submit less frequently and more messages to be consumed after a restart
    • Asynchronous submission

      • KafkaConsumer#commitAsync
      • Problems will not be retried
      try {
        while(true) {
        ConsumerRecords<String, String> records =
      consumer.poll(Duration.ofSeconds(1));
        process(records); // Process the message
        commitAysnc(); // Use asynchronous commit to avoid blocking}}catch(Exception e) {
        handle(e); // Handle exceptions
      } finally {
        try {
          consumer.commitSync(); // The last commit uses synchronous blocking commit
       } finally{ consumer.close(); }}Copy the code
  • Displacement Management API

    API instructions
    public void assign(Collection partitions) Manually assign a series of topic partitions to the current consumer.

    Manual partition allocation does not support incremental allocation and overwrites the previous allocation if a partition was previously allocated. If the given topic partition is empty, it is equivalent to calling the unsubscribe method.

    The manual method of allocating topic partitions does not use the consumer group management feature. When the consumer group membership changes, or the metadata of the cluster or topic changes, no rebalancing of partition allocation is triggered. Partition manually assign (Collection) and automatic partition the subscribe (Collection, ConsumerRebalanceListener) are used together. If automatic commit offsets are enabled, the consumption offsets in the old partition allocation are committed asynchronously before the new partition allocation replaces the old one.
    public Set assignment() Gets the collection of partitions assigned to the current consumer. The same collection is returned if the subscription is directly assigned topic partitions by calling the Assign method. If a topic subscription is used, this method returns the collection of topic partitions currently assigned to the consumer. None is returned if the partition subscription has not yet started partitioning or is reassigning partitions.
    public Map<String, List> listTopics() Gets metadata for all subject partitions authorized to the user. This method makes a remote call to the server.
    public List partitionsFor(String topic) Gets partition metadata for the specified topic. If the current consumer does not have metadata on the topic, a remote call is made to the server
    public Map<TopicPartition, Long> beginningOffsets(Collectionpartitions) For a given topic partition, list the offset of their first message. Note that this method may block forever if the specified partition does not exist. This method does not change the current consumer offset of the partition.
    public void seekToEnd(Collection partitions) Moves the offset to the last one for each given partition. This method executes lazily and is available only after the poll or position methods have been called. If no partition is specified, the consumer offsets of all partitions currently assigned by the consumer are moved to last. If the isolation level is set to Isolation.level = read_COMMITTED, the consumption offset of the partition is moved to the last stable offset, that is, the next message to be consumed is now a transaction message in the uncommitted state
    public void seek(TopicPartition partition, long offset) Moves the consumption offset of a given topic partition to the specified offset, the next message offset to be consumed by the current consumer. If the method is called multiple times, the last one overwrites the previous one. If used arbitrarily in the middle of consumption, data may be lost.
    public long position(TopicPartition partition) Checks the consumption offset for the specified topic partition
    public void seekToBeginning(Collection partitions) Moves the consumer offsets for each given partition to their starting offsets. This method executes lazily and only executes after the poll or position methods have been called. If no partitions are provided, all partition consumption offsets assigned to the current consumer are moved to the starting offset.
    #Look at the displacement _consumer_offsetskafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" -- consumer.config / opt/kafka_2. 12-1.0.2 / config/consumer, the properties - from - beginning | headCopy the code

    The consumer_offsets theme configures the Compact policy to always keep the latest displacement information, which controls the overall log capacity and preserves offsets.

  • Weight balance

    Triggering conditions:

    • A member of the consumer group has changed
    • The number of subject partitions changed. Procedure
    • The subscription topic has changed

    Session.timeout. ms indicates the timeout period. Heartbeat.interval. ms controls the frequency at which heartbeat is sent. Max-poll.interval. ms indicates the interval between two polls

    Relatively reasonable configuration:

    • session.timeout.ms: 6s
    • heartbeat.interval.ms: 2s
    • Max.poll.interval. ms The maximum time it takes for consumers to process messages is increased by 1 minute

    Group Coordinator to manage consumer groups. When the first consumer of a consumer group is started, it goes to the Kafka Broker to determine who is the group coordinator for their group. All consumers in the consumer group then coordinate communication with the group coordinator.

    Determine the Coordinator:

    • Determine which partition the consumer group shift is written to _consumerS_offsets. Calculation method is Math. Abs (groupId. HashCode () % offsets. The topic. The num. Partitions)
    • The broker where the leader of this partition resides is the group coordinator

    Rebalance Generation Represents a version of the consumer mapping between topic partitions and consumer groups after the Rebalance. It is used to protect consumer groups and isolate invalid offset commits.

    Five ways to deal with agreements related to consumer group coordination

    • Heartbeat requests: The consumer needs to periodically send a Heartbeat to the group coordinator to indicate that it is alive
    • LeaveGroup Request: Take the initiative to tell the group coordinator that I’m leaving the consumer group
    • SyncGroup request: The consumer group Leader tells all members of the group about the allocation plan
    • JoinGroup Request: A member requests to join a group
    • DescribeGroup request: Displays all group information, including member information, protocol name, assignment scheme, and subscription information. Usually this request is for the administrator’s use

    Process:

    • Join: Join a group. All members send a JoinGroup request to the consumer group coordinator to join the consumer group. Once all members have sent the JoinGroup request, the coordinator selects one of the consumers to act as Leader and sends the group membership information and subscription information to the Leader.
    • Sync, the Leader, starts to assign consumption scenarios, namely which consumers are responsible for consuming which partitions of which topics. Once the allocation is complete, the Leader wraps the solution into a SyncGroup request and sends it to the consumer group coordinator. Non-leaders also send a SyncGroup request with nothing. The consumer group coordinator receives the allocation plan and inserts it into the SyncGroup response to send to each consumer.

    Until the coordinator has collected all member requests, it places the received requests into a place called purgatory

    Intra-group partition allocation policy:

    • RangeAssignor

      Each topic is individually partitioned, first sorted numerically by partition ID, then sorted lexicographically by consumers subscribing to this topic, and distributed as evenly as possible

      The principle is to divide the total number of consumers and the total number of partitions to obtain a span, and then divide the partitions evenly according to the span to ensure that the partitions are evenly distributed to all consumers as far as possible. For each Topic, the RangeAssignor strategy sorts all the consumers in the consumer group that subscribe to that Topic in lexicographical order by name, and then allocates a fixed partition range for each consumer. If the distribution is not equal, the highest consumer in the lexicographical order is assigned an additional partition.

      The obvious problem with this distribution is that as the number of topics consumers subscribe to increases, the imbalance becomes more and more serious

    • RoundRobinAssignor

      Order the partitions of all subscribed topics and all consumers in the consumer group as evenly as possible. (RangeAssignor is assigned to a single Topic partition. There will be imbalance in the case of inconsistent consumer subscription topics within the consumer group

    • StrickyAssignor

      Neither RangeAssignor nor round assignor is considered by the current partitioning algorithm. Obviously, it can save a lot of overhead to adjust partition allocation changes as little as possible before performing a new allocation, taking into account the results of the previous allocation. Partition distribution should be as balanced as possible. The result of each redistribution should be the same as the result of the previous redistribution

    Consumer group status

    • Dead: The final state of a group in which there is no member and the group metadata has been removed by the group coordinator. This state responds to various requests with a response: UNKNOWN_MEMBER_ID
    • Empty: There are no members in the group, but the displacement information has not expired. This state can only respond to JoinGroup requests
    • PreparingRebalance: The group is ready to start a new rebalance, waiting for members to join
    • AwaitingSync: Waiting for the Leader Consumer to pass the allocation scheme to the members
    • Stable: Rebalancing is complete and consumption can start.
  • Consumer interceptor

    Implement org. Apache. Kafka. Clients. Consumer. ConsumerInterceptor < K, V > interface

    The ConsumerInterceptor callback occurs on the same thread as the KafkaConsumer#poll(long) method

    props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,”xxx” );

    public interface ConsumerInterceptor<K.V> extends Configurable {
    
        // called before the poll method returns
        public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
    
        // Commit the offset call
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
    
        public void close(a);
    }
    
    Copy the code
    public class OneInterceptor implements ConsumerInterceptor<String.String> {
        @Override
        public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
            // The last method to call before the poll method returns the result
            System.out.println("One -- start");
            // The message is returned without processing
            return records;
        }
    
        @Override
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
            // This method is used when the consumer submits the offset
            System.out.println("One -- The end");
        }
    
        @Override
        public void close(a) {
            // Close resources used by the interceptor, such as open files, connected databases, etc
        }
    
        @Override
        public void configure(Map
             
               configs)
             ,> {
            // Used to get the consumer's setting parameters
            configs.forEach((k, v) -> {
                System.out.println(k + "\t"+ v); }); }}Copy the code

The theme

  • Kafka – switchable viewer. Sh scripts
options instructions
–config String:name=value Specify configuration information for the topic you created or modified. The following configuration items are supported:

cleanup.policy

compression.type

delete.retention.ms

file.delete.delay.ms

flush.messages

flush.ms

follower.replication.throttled.replicas

index.interval.bytes

leader.replication.throttled.replicas

max.message.bytes

message.format.version

message.timestamp.difference.max.ms

message.timestamp.type min.cleanable.dirty.ratio

min.compaction.lag.ms

min.insync.replicas

preallocate

retention.bytes retention.ms

segment.bytes

segment.index.bytes

segment.jitter.ms segment.ms

unclean.leader.election.enable
–create Create a new theme
–delete Delete a topic
–delete-config <String: name> Deletes a topic configuration entry for an existing topic. These are the configuration items given in –config.
–alter Change the number of partitions, replica assignments, and/or configuration entries for a topic.
–describe List the details of a given topic.
–disable-rack-aware Disable rack awareness for replica assignment.
–force Suppress the console prompt
–help Printing Help Information
–if-exists If this option is specified, you can modify or delete a topic only if the topic exists.
–if-not-exists If this option is specified when creating a theme, the command can be executed only when the theme does not exist
–list List all available topics.
–partitions <Integer: # of partitions> The number of partitions for the topic to create or modify.
–replica-assignment <String:broker_id_for_part1_replica1

:broker_id_for_part1_replica2

,broker_id_for_part2_replica1

:broker_id_for_part2_replica2 , … >
Manually specify the partition-to- broker assignment when creating or modifying a topic.
–replication-factor <Integer:replication factor> Number of theme partition copies to create. 1 indicates that there is only one replica, that is, the Leader replica.
–topic <String: topic> The name of the topic to create, modify, or describe. In addition to creating, modifying, and describing regular expressions can also be used here.
–topics-with-overrides if set when describing topics, only show topics that have overridden configs
–unavailable-partitions if set when describing topics, only show partitions whose leader is not available
–under-replicated-partitions if set when describing topics, only show under replicated partitions
–zookeeper <String: urls> Required arguments: string to connect to ZooKeeper, comma separated host:port list. Multiple urls can fail over.

Parameters in topics

attribute The default value Default server properties instructions
cleanup.policy delete log.cleanup.policy Either “delete” or “compact”; This string specifies how to use the old log section; Tacit recognition (“delete”) will discard old parts when their recycling time or size limit is reached. Compact will compress logs
compression.type none Producer Indicates the compression class used to compress data. The default is no compression. The correct option values are None, gzip, and snappy. Compression is best used for batch processing, the more messages processed in batch, the better compression performance.
delete.retention.ms 86400000 (24hours) log.cleaner.delete.retention.ms Maximum period of retention for compressed logs
flush.ms None log.flush.interval.ms This configuration is used to top the mandatory fsync log to disk interval. For example, if set to 1000, fsync is required every 1000ms. This option is generally not recommended
flush.messages None log.flush.interval.messages This configuration specifies the interval at which fsync logs are forced.
index.interval.bytes 4096 log.index.interval.bytes The default ensures that we add an index to the message every 4096 bytes,
max.message.bytes 1000000 max.message.bytes Maximum size of kafka appending messages.
min.cleanable.dirty.ratio 0.5 min.cleanable.dirty.ratio This configuration controls how often the log compressor tries to clear logs. By default, clearing logs whose compression rate exceeds 50% is avoided. This ratio avoids the biggest waste of space
min.insync.replicas 1 min.insync.replicas When producer sets request. Required. Acks to -1, min.insync.replicas specifies the minimum number of replicas
retention.bytes None log.retention.bytes If you use the “delete” retention strategy, this configuration is the maximum size that a log can achieve before deleting it. By default, there is no size limit, only time limit
retention.ms 7 days log.retention.minutes If the retention policy of “delete” is used, this configuration refers to the time that logs are kept before being deleted.
segment.bytes 1GB log.segment.bytes In Kafka, logs are stored in blocks. This configuration refers to the size of the log blocks
segment.index.bytes 10MB log.index.size.max.bytes This configuration is about the size of the index file mapped between offsets and file locations; You generally do not need to modify this configuration
segment.jitter.ms 0 log.roll.jitter.{ms,hours} The maximum jitter to subtract from logRollTimeMillis.
segment.ms 7 days log.roll.hours Even if the size of the log partition file does not reach the upper limit, the system forces a new log partition file to be created once the log time reaches the upper limit
unclean.leader.election.enable true Specifies whether replicas, which is not in the ISR, can be set to act as the leader
  • Create a theme

    kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x - -partitions 1 --replication-factor 1 
    kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02 --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760
    Copy the code
  • Check the topic

    kafka-topics.sh --zookeeper localhost:2181/myKafka --list kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides -- describe
    Copy the code
  • Modify the theme

    kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 --partitions 2 --replication-factor 1 
    kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config max.message.bytes=1048576
    kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01 kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config segment.bytes=10485760 
    kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01
    Copy the code
  • Delete the topic

    kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x
    Copy the code
  • Increase the partition

    kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 -- partitions 2
    Copy the code
  • Partition copy allocation

    Without considering rack information:

    • The first replica partition selects a broker by polling and allocates it. This poll is from a random location in the list of brokers

    Polling.

    • The remaining copies are allocated by adding offsets.
  • Offset management

    bin/kafka-consumer-groups.sh

parameter instructions
–all-topics Sets all topics associated with the specified consumer group to the scope of the reset-offsets operation
–bootstrap-server <String: server toconnect to> Must: (new consumer based on consumer group): server address to connect to.
–by-duration String:duration A period of time from the current timestamp. Format: ‘PnDTnHnMnS’
–command-config <String:command config propertyfile> Specifies a configuration file whose contents are passed to the Admin Client and consumer
–delete Pass the consumer group name, removing the individual partition offsets and owner relationships for the entire consumer group and all topics. –group G1 –group G2 –topic T1.
–describe Describes the offset gap (how many messages have not yet been consumed) for a given consumer group.
–execute Perform operations. Supported operations: reset-offsets
–export Export the operation result to a CSV file. Supported operations: reset-offsets
–from-file <String: path to CSV file> Resets the offset to the value defined in the CSV file.
–group <String: consumergroup> Target consumer groups.
–list List all consumer groups
–new-consumer Use the new consumer implementation. This is the default value. This action will be removed in subsequent releases.
–reset-offsets Reset the offset of the consumer group. Only one consumer group is supported at a time, and that consumer group should be inactive

1. (default) Plan: Which offset to reset.

2. Execute: Executes the reset-offsets operation

3. Process: Cooperate with –export Export the operation result to CSV format.

You can use the following options

–to-datetime

–by-period

–to-earliest

–to-latest

–shift-by

–from-file

–to-current

To define the scope of the action, use: –all-topics –topic
–shift-by Long:number-of-offsets Reset n offsets. N can be positive or negative.
–timeout <Long: timeout(ms)> The default timeout period for some operations is 5000.
–to-current Reset to the current offset.
–to-datetime String:datetime Resets the offset to the specified timestamp. Format: ‘YYYY – MM – DDTHH: MM: SS. The SSS’
–to-earliest Reset to the earliest offset
–to-latest Reset to the latest offset
–to-offset Long:offset Reset to the specified offset.
–topic <String: topic> Specify which topic’s consumer groups to delete or which topic’s consumer groups to include in the reset-offsets operation. For the reset-offsets operation, you can also specify partitions topic1:0,1,2. Where 0, 1, and 2 indicate the partition numbers to be included in the operation. The offset reset operation supports multiple topics working together.
–zookeeper <String: urls> –zookeeper node1:2181/myKafka

Check which group ids are being consumed:

kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
Copy the code

Viewing offsets

kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group group
Copy the code

The offset is set earliest

kafka-consumer-groups.sh --bootstrap-server node1:9092 --reset-offsets --group group --to-earliest --topic topic1
Copy the code

The offset is up to date

kafka-consumer-groups.sh --bootstrap-server node1:9092 --reset-offsets --group group --to-latest --topic topic1
Copy the code
kafka-consumer-groups.sh --bootstrap-server node1:9092 --reset-offsets --group group  --topic topic1 --shift-by -10
Copy the code

partition

A copy of the

Definition of synchronization node:

  • The node must be able to maintain a session with ZK
  • Not too far behind. Too far behind means that the number of replicated messages behind the Leader exceeds a preset value (parameter: replica.lag.max.messages Default value: 4000) or the Follow does not send fetch requests to the Leader for a long time (parameter: Replica.lag.time.max. ms Default value: 10000).

Partition reallocation

kafka-reassign-partitions.sh

  • Generate mode, automatically generate reassign plan (not executed)
  • Execute Reassigns data according to the specified reassignPlan
  • Verify verifies that the reallocation was successful

Generate reassign the plan

  • The definition file

    cat topic-to-move.json

    {
        "topics":[
            {
                "topics": "te_re_01"}]."version": 1
    }
    Copy the code
    kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --topics-to-move-json-file topics-to-move.json --broker-list "0, 1" - the generate#Put the above results into JSON
    kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file topics-to-execute.json --execute
    
    kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file topics-to-execute.json --verify
    Copy the code

Leader reassign

Sh --zookeeper node1:2181/myKafka --create --topic tp_demo_03 -- replica-Assignment "0:1,1:0,0:1"Copy the code

Modify back to the original copy

{
  "partitions": [{"topic":"tp_demo_03"."partition":0
   },
   {
      "topic":"tp_demo_03"."partition":1
   },
   {
      "topic":"tp_demo_03"."partition":2}}]Copy the code
kafka-preferred-replica-election.sh --zookeeper
node1:2181/myKafka --path-to-json-file preferred-replicas.json
Copy the code

Modifying copy factor

{
  "version":1."partitions":[
   {"topic":"tp_re_02"."partition":0."replicas": [0.1] {},"topic":"tp_re_02"."partition":1."replicas": [0.1] {},"topic":"tp_re_02"."partition":2."replicas": [1.0]]}}Copy the code
kafka-reassign-partitions.sh --zookeeper
node1:2181/myKafka --reassignment-json-file increase-replication-
factor.json --execute
Copy the code

Physical storage

configuration The default value instructions
log.index.interval.bytes 4096(4K) Increasing the interval density of index entries affects the interval density and query efficiency in index files
log.segment.bytes 1073741824(1G) Maximum log file size
log.roll.ms The maximum allowable difference between the maximum timestamp of a message in the current log segment and the timestamp of the current system, in milliseconds
log.roll.hours 168 (7 days) The maximum allowable difference, in hours, between the maximum timestamp of a message in the current log segment and the timestamp of the current system
log.index.size.max.bytes 10485760(10MB) Trigger offset index file or timestamp index file segment byte quota
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log | head

kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index --print-data-log | head
Copy the code

In an offset index file, the index data is sequentially recorded at offset, but each appended index timestamp in the timestamp index file must be greater than the previously appended index entry, otherwise it is not appended. After Kafka 0.11.0.0, there are several timestamp information in the message message. If the broker side parameter the message. The timestamp. The type is set to LogAppendTIme, so the timestamp will be bound to keep the monotone increasing. Whereas CreateTime does not guarantee the order.

Kafka provides two log cleaning strategies:

  • Log deletion: Deletes the data that does not meet the specified deletion policy

    • Based on the time

      Log will delete task according to the log. The retention. Hours/log retention. Minutes/the retention. Ms set the log retention time node. If this value is exceeded, deletion is required. The default is 7 days. Log.retention. Ms has the highest priority.

      Kafka locates based on the maximum timestamp in the log segment.

      Query the timestamp index file corresponding to the log segment and search for the last index entry in the timestamp index file. If the timestamp field value of the last index entry is greater than 0, the value is used; otherwise, the last modified time is used.

      Removal process

      • Remove log segments to be deleted from the skip list of log segments maintained in the log object to ensure that no thread can read these log segments.
      • These log segments add the.delete suffix to all files.
      • Commit a deferred task named “delete-file” to delete the.delete files. The delay time can be set in file.delete.delay.ms
      • Kafka first splits a new log fragment as an active log fragment. This log fragment is not deleted, but the original log fragment is deleted
    • Log size based

      The log deletion task checks whether the size of the current log exceeds the preset value. This parameter is set to log.retention. Bytes. The size of a single log segment is set by log.segment.bytes

      Removal process

      • Calculate the total log size that needs to be deleted (current log file size (all segments) minus retention value)
      • Find the collection of files that can delete the LogSegment starting with the first LogSegment of the log file.
      • Perform the deletion operation.
    • Based on offset

      Determine whether the start offset of the next log segment is greater than or equal to the start offset of the log file. If yes, delete the log segment.

      Removal process

      • Each log segment is iterated from the beginning. The start offset of the next log segment of log segment 1 is 21, which is less than logStartOffset. Then, log segment 1 is added to the deletion queue
      • If the offset of the next log segment of log segment 4 is 71 and greater than logStartOffset, it is not deleted.
  • Log compression: Consolidates the Key of each message. For different values with the same Key, only the last version is retained.

    Kafka provides the log.cleanup.policy parameter for configuration. The default value is delete, and compact is optional. The topics-level configuration item is cleanup.policy.

    Log compression is all about keys, ensuring that the key for each message is not null. Compression is done behind the scenes in Kafka by periodically reopening segments

    Kafka’s background thread periodically traverses the Topic twice:

    • Records the last hash offset of each key
    • The second time to check whether the Key corresponding to each offset appears in the subsequent log, if so, delete the corresponding log.

    Log.cleanup. policy is set to compact, the configuration of the Broker, which affects all topics in the cluster.

    Log.cleaner.min.com paction. Lag. Ms, used to prevent compression to update news more than the minimum, if not set, in addition to the last Segment, all Segment is compressed

    Log.cleaner.max.com paction. Lag. Ms, used to prevent low production rate of the log in the unlimited time without compression.

Disk storage

Kafka’s two processes:

1. Network data is persisting from Producer to Broker.

Data drops are usually non-real-time, and Kafka’s data is not written to disk in real time. It takes advantage of modern operating systems’ paged storage to improve I/O efficiency with memory.

2. Disk files are sent over the network (Broker to Consumer)

Zero copy

Disk data is copied to the kernel-state Buffer through Direct Memory Access (DMA). Copy to NIC Buffer(socket Buffer) directly through DMA, without CPU copy. In addition to reducing data copying, the entire read file ==> network send is done by a single sendFile call with only two context switches, thus greatly improving performance. fileChannel.transferTo( position, count, socketChannel);

Page caching

The data on the disk is cached to the memory, and the access to the disk is changed to the access to the memory. When Kafka receives network data from the socket buffer, the application process does not need intermediate processing, directly persistent. Mmap memory file mapping can be used. By mapping disk files to memory, users can modify disk files by modifying memory.

Mmap also has an obvious drawback: it is unreliable. Data written to Mmap is not actually written to disk, and the operating system does not write data to disk until the program initiates a flush call. Kafka provides a parameter, producer.type, to control whether it is active flush; If Kafka writes to the Mmap, it flush immediately and returns the Producer as sync. Immediately after writing to mmap, return Producer without calling Flush. Async

Java NIO provides a MappedByteBuffer class that can be used to implement memory mapping. The MappedByteBuffer can only be retrieved by calling FileChannel map(). Mmap file mappings are released only during full GC. When close, the memory mapped files need to be cleaned manually, and the Sun.misc.cleaner method can be called reflectively.

Sequential writes

Operating systems can be optimized for linear reads and writes at deep levels, such as prefetch and postwrite

The stability of

The transaction

To create consumer code, you need:

  • Turn off the autocommit property (auto.mit) in configuration
  • And you cannot use manual commitSync() or commitAsync() in your code.
  • Set the isolation level

To create a generator, use the following code:

  • Configure the transactional. Id attribute
  • Configure the enable.idempotence attribute

broker config

Configuration items instructions
transactional.id.timeout.ms The maximum time that the transaction coordinator waits before the producer TransactionalId expires prematurely without receiving any transaction status updates from that producer TransactionalId. The default is 604800000(7 days). This allows weekly producer jobs to maintain their ids
max.transaction.timeout.ms Maximum timeout allowed for a transaction. The default value is 900000(15 minutes).
transaction.state.log.replication.factor Number of replicas of the transaction state topic. Default value: 3
transaction.state.log.num.partitions Number of partitions for the transaction status topic. Default value: 50
transaction.state.log.min.isr Minimum number of ISRs per partition for a transaction status topic. Default: 2
transaction.state.log.segment.bytes Segment size for the transaction state topic. Default value :104857600 bytes

producer configs

Configuration items instructions
enable.idempotence Open power etc.
transaction.timeout.ms Transaction timeout. The maximum time that the transaction coordinator waits for a producer to update the state of a transaction before actively aborting an ongoing transaction. If the value is greater than max-transaction.timeout. In setting ms broke, the request will fail, and InvalidTransactionTimeout mistakes. The default is 60000.
transactional.id TransactionalId for transactional delivery. This supports reliability semantics across multiple producer sessions because it allows clients to ensure that transactions using the same TransactionalId have completed before starting any new transactions.

consumer configs

Configuration items instructions
isolation.level – read_UNcommitted: Uses committed and uncommitted messages in offset order. – READ_COMMITTED: Only non-transactional or committed transactional messages are used in offset order. To maintain offset sorting, this setting means that we must buffer messages in the consumer until we see all the messages in a given transaction.

To achieve idempotency, Kafka introduces ProducerID and SequenceNumber in its underlying design architecture.

  • ProducerID: When each new Producer is initialized, it is assigned a unique ProducerID that is not visible to the client user
  • SequenceNumber: For each ProducerID, each Topic and Partition that Producer sends data has a SequenceNumber value that increases monotonically from 0.

The controller

Kafka elects the cluster controller through Zookeeper’s distributed locking feature and notifies the controller when a node joins or leaves the cluster. The controller elects the zone Leader when a node joins or leaves the cluster. The controller uses epoch to avoid “splitting the brain”. “Split-brain” is when two nodes simultaneously consider themselves to be the current controller.

Consistency assurance

LEO and HW

LEO: Log end offset, which records the displacement of the next message in the replica log.

HW: the water level value mentioned above. For the same replica object, the HW value cannot be greater than the LEO value. All messages with HW values less than or equal to are considered “replicated”. The HW updates of the Leader and Follower copies are different.

  • When the Follower copy updates the LEO

    Kafka has two sets of Follower replica LEO:

    A set of LEOs is stored in the copy manager of the Broker where the Follower copy resides.

    Another SET of LEOs is kept in the replica manager machine of the Broker where the Leader replica resides. The Leader copy holds all follower copies of the LEO on the machine

    Kafka uses the former to help the Follower copy update its HW value. Use the latter to help the Leader replica update its HW.

    When is the local LEO of the Follower copy updated? The Follower copy’s LEO is the log’s LEO, which is updated every time a new message is written. After the Follower sends the FETCH request, the Leader returns the data to the Follower. At this point, the Follower starts to Log the data and automatically updates the LEO value.

    When does the LEO of the Follower on the Leader end update? The Follower LEO update on the Leader side occurs when the Leader processes the Follower FETCH request. Once the Leader receives the FETCH request sent by the Follower, it first reads the corresponding data from the Log and updates the Follower’S LEO before returning the data to the Follower.

  • When does the Follower copy update the HW?

    The Follower updates his HW after he updates his LEO. Once the Follower has written to the Log, he tries to update his HW. Compare the current LEO value with the Leader’s HW value in the FETCH response, and take the smaller of the two as the new HW value.

  • When does the Leader copy update LEO?

    The Leader automatically updates its LEO value when writing logs.

  • When the Leader copy updates the HW value

    • When the Follower copy becomes the Leader copy: Kafka tries to update the partition HW.

    • When a Broker crash causes a copy to be kicked out of the ISR: It is necessary to check whether the partition HW value needs to be updated

    • When the producer writes messages to the Leader copy: Because writing messages updates the Leader LEO, it is necessary to check whether the HW value needs to be updated

    • When the Leader processes the Follower FETCH request, it first reads data from the Log and then tries to update the partition HW value

The message to repeat

  • Producer stage

    No correct broker response was received and a retry was performed

    Solution:

    • Enable idempotent ACK =all retries>1
  • The broker stage

    • Unclearn elections are disabled
    • min.insync.replicas>1
  • Consumer stage

    • Cancel automatic submission
    • Downstream do idempotent, record offset

_consumer_offsets

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node1:9092 --topic tp_test_01 --time -1
kafka-console-consumer.sh --bootstrap-server node1:9092 -- topic tp_test_01 --from-beginning
kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
#To query _consumer_offsets, set exclude.internal. Topics = in consumer.propertiesfalse
kafka-console-consumer.sh --topic __consumer_offsets -- bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" -- consumer.config config/consumer.properties --from-beginning

 kafka-simple-consumer-shell.sh --topic __consumer_offsets -- partition 19 --broker-list node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
Copy the code

Delay queue

TimingWheel is an implementation of the Kafka time wheel, which contains an array of TimerTaskLists. Each array contains a list of TimerTaskEntry events. Each TimerTaskList represents a cell of the time wheel, and the time span of this cell is tickMs. Events in the same TimerTaskList are all within a tickMs span, and the time span of the entire time wheel is interval = tickMs * wheelSize, The time wheel can process events between cuurentTime and currentTime + Interval.

Expiration >= currentTime + Interval When adding a time whose timeout is greater than the span of the entire time round, expiration >= currentTime + Interval, the event will be transmitted to the upper layer whose tickMs is the interval of the lower layer. Pass until a time round satisfies expiration <currentTime + Interval, then calculate which compartment it is in, then put the event in, reset the timeout, and then put it into the JDK delay queue

The SystemTimer will fetch the TimerTaskList from the queue, advance currentTime forward according to expiration, and put all events back into the time wheel. The task is then submitted to the Java thread pool for processing.

The server may not immediately return the response result to the client for different requests. When these requests are processed, the server creates deferred action objects for these requests and places them in a deferred cache queue. The data structure of the deferred cache is similar to that of MAP. The deferred operation object is completed and removed from the deferred cache queue in two ways: 1. When the external event corresponding to the deferred operation occurs, the external event tries to complete the deferred operation in the deferred cache. 2. If the external event still does not complete the delayed operation, the delayed operation will be forced to complete after the timeout period is reached.