This is the sixth installment of Learn Kafka from Me: Learn Kafka from The Ground up, and follow me on my journey from nerd to expert.

This article will give a brief introduction to Kafka Consumer, which is a window into further research on Kafka Conumer. Mainly from the following three aspects:

  • Core parameter

  • Core components

  • Core API

1. Kafka Consumer Core metrics

In my opinion, to understand the core working mechanism of Kafka Consumer, we can start from its core parameters, which will lay the foundation for further understanding of its queue load mechanism, message pull model, consumption model, point submission mechanism, etc.

The core attributes of Kafka Consumer are defined in ConsumerConfig.

1.1 Basic Function Parameters

  • Group. id Indicates the name of the consumer group.

  • Client. id Indicates the CLIENT ID. The default id is consumer-. In practice, it is recommended that the client IP address be included.

  • Bootstrap. servers Broker List of server addresses.

  • client.dns.lookup

    The client can search for the bootstrap address in either of the following ways:

  • Resolve_canonical_bootstrap_servers_only Specifies the hostname provided by bootstrap.servers. According to the name of the host service returns the IP address of the array (InetAddress. GetAllByName), and then in turn get InetAddress. GetCanonicalHostName (), to establish a TCP connection. A host can be configured with multiple network adapters. If this function is enabled, it can effectively take advantage of multiple network adapters and reduce the network side load of the Broker.

  • Use_all_dns_ips creates TCP connections directly using hostname and port provided in bootstrap.servers.

  • Enable.auto.com MIT Specifies whether to enable automatic site submission. The default value is true.

  • Auto.mit.interval. ms If automatic site submission is enabled, the default site submission frequency is 5s.

  • Partition. The assignment. The strategy consumption load side queue algorithm, the default for the interval average distribution (RangeAssignor), optional value: polling (RoundRobinAssignor)

  • Auto-.offset. reset Resets the site policy, but restores the message when kafka commits the site after it has been deleted. Default is Latest, and options are: Earliest or None (exceptions will be thrown).

  • Key. deserializer Specifies the key serialization class to be used

  • Value. Deserializer Message body serialization class

  • Classes consumer interceptor. There can be multiple interceptors.

  • Check. CRCS Specifies whether CRC verification is required at the consumer end. The default value is true.

1.2 Network Parameters

  • Send.buffer. bytes Indicates the size of the TCP send buffer. The default value is 128 KB.

  • Receive.buffer. bytes Indicates the size of the TCP receive buffer. The default value is 32 KB.

  • Ms reconnect. Backoff. Ms wait time for reestablishing a link. The default value is 50ms, which belongs to the underlying network parameter and needs no attention.

  • Ms Indicates the maximum waiting time for reconnect. The default value is 1s. If you reconnect the same connection twice in a row, the waiting time will increase exponentially on the initial value of reconnect.

  • Retry.backoff. ms retry interval. The default value is 100ms.

  • Connections.max.idle. ms Maximum idle time for a connection. The default value is 9 seconds.

  • Request.timeout. ms Specifies the timeout duration of the request and network communication with the Broker.

1.3 Core working parameters

  • Max.poll. records Maximum number of messages pulled per poll method call. Default is 500.

  • max.poll.interval.ms

    The maximum interval between two poll method calls, in milliseconds, defaults to 5 minutes. If the consumer does not poll within that interval, the consumer will be removed, triggering a rebalance to allocate the queue allocated by the consumer to other consumers.

  • Session.timeout. ms Indicates the heartbeat timeout between the consumer and the broker. By default, the broker will remove the consumer and trigger a rebalancing if no heartbeat request is received within a specified period of time.

  • Ms Heartbeat interval at which consumers send heartbeats to the broker. Default is 3s to ensure that the session does not expire.

  • Fetch. Min. bytes Minimum number of bytes returned by a pull message. The default value is 1 byte.

  • fetch.max.bytes

    The maximum number of bytes returned by a pull message. The default is 1M, or if the first batch of messages from a partition is larger than this value.

  • . Max. Partition. The fetch bytes at a time pull each partition maximum pull bytes, the default is 1 m.

  • Fetch. Max.wait. ms FETCH Indicates the maximum waiting time for the data to be pulled.

  • Metadata.max.age. ms Specifies the expiration time of metadata on a client. After the expiration time, the client pulls the latest metadata from the broker. The default value is 5 minutes.

  • Internal. Leave. Group. On. Close whether consumers is turned off immediately leave the subscription set, the default is true, that is when the client disconnect immediately trigger the weight balance. If set to false, rebalancing is not triggered immediately, but only after the session expires.

2. KafkaConsumer core components and apis

Through KafkaConsumer core parameters, we can basically explore the core points in Kafka, and then introduce the core components of KafkaConsumer, to lay a foundation for the subsequent in-depth study of Kafka consumer consumption model.

2.1 Core Components

KafkaConsumer consists of the following core components:

  • ConsumerNetworkClient A network client that serves the underlying network communication and is responsible for RPC communication between the client and the server.

  • In Kafka’s design, each consumer group elects a broker node in the cluster to act as the group’s coordinator, responsible for state management of consumer group states, especially consumer group rebalancing (joining and leaving consumers). This class is where the consumer interacts with the Broker coordinator.

  • Fetcher Message pull.

Tips: This article is not intended to explain each component in detail, but it is recommended that you follow the meaning of each parameter in part 1 of this article, and then think about the correlation between the parameters and which component resumes are eventually passed to.

2.2 Core API Overview

Finally, let’s look at the consumer’s core API.

  • Set< TopicPartition> Assignment () gets the list of queue assignments for this consumer.

  • Set< String> subscription() gets subscription information for this consumer.

  • Void subscribe(Collection< String> topics) subscribe to topics.

  • Void the subscribe (Collection < String > switchable viewer, ConsumerRebalanceListener callback) subscribe to topics, and specify the queue weight balance of the listener.

  • Void assign(Collection< TopicPartition> partitions) replaces subscription to manually specify which queues to consume.

  • Void unsubscribe() Unsubscribe.

  • ConsumerRecords

    Poll (Duration timeout) Pull messages, which is the core method of KafkaConsumer, are described in more detail below.

  • Void commitSync() synchronizes the commit consumption progress for this batch, which will be detailed in a subsequent article.

  • Void commitSync(Duration timeout) Synchronizes the commit consumption progress. The timeout period can be set.

  • void commitSync(Map

    Offsets displays synchronous commit consumption progress, and offsets specifies the information that needs to commit consumption progress.

  • void commitSync(final Map

    Offsets, final Duration timeout) displays synchronous submission consumption progress, with timeout.

  • Void seek(TopicPartition partition, long offset) resets the offset of the next pull message of the consumer#poll method.

  • Void seek(TopicPartition Partition OffsetAndMetadata OffsetAndMetadata) seek Method Overloads the method.

  • Void seekToBeginning(Collection< TopicPartition> Partitions) sets the next pull offset of the poll method to the queue’s initial offset.

  • Void seekToEnd(Collection< TopicPartition> Partitions) sets the next pull offset of the poll method to the maximum queue offset.

  • Long Position (TopicPartition Partition) Gets the offset to be pulled.

  • Long position(TopicPartition Partition, final Duration timeout) Repeat the preceding steps.

  • OffsetAndMetadata Committed (TopicPartition partition) Obtains the committed offset of a specified partition.

  • OffsetAndMetadata committed(TopicPartition partition, final Duration timeout).

  • Map metrics()

    Statistical indicators.

  • List< PartitionInfo> partitionsFor(String Topic) Gets the routing information for the topic.

  • List< PartitionInfo> partitionsFor(String topic, Duration timeout) same as above.

  • Map listTopics() gets routing information for all topics.

  • Map listTopics(Duration timeout)

    Same as above.

  • Set< TopicPartition> paused() gets information about the suspended partition.

  • Void Pause (Collection< TopicPartition> partitions) suspends partitions, and the next poll method will not return messages for these partitions.

  • Void resume(Collection< TopicPartition> partitions) restores suspended partitions.

  • Map

    OffsetsForTimes (MaptimestampsToSearch) Finds the offset of the most recent message based on the timestamp.

  • Map

    OffsetsForTimes (MaptimestampsToSearch, Duration timeout) ditto.

  • Map

    BeginningOffsets (Collection< TopicPartition> Partitions) Queries the minimum current offset of the specified partition.

  • Map

    BeginningOffsets (Collection< TopicPartition> partitions, Duration timeout) same as above.

  • Map

    EndOffsets (Collection< TopicPartition> Partitions) Queries the maximum current offset of a specified partition.

  • Map

    EndOffsets (Collection< TopicPartition> partitions, Duration timeout) same as above.

  • Void close() closes the consumer.

  • Void close(Duration timeout) Closes the consumer.

  • Void wakeup() awakens the consumer.

Consumers provided by Kafka do not provide Push mode automatic pull messages as RocketMQ does, requiring the application to automatically organize these apis for pulling messages.

It is worth noting that Kafka consumers also support auto-commit mechanisms. Kafka’s consumer objects are thread-unsafe.

Based on KafkaConsumer, pause(pause consumption of certain partitions) and Resume (resume consumption of certain partitions) can easily implement the consumption end traffic limiting mechanism.

This paper mainly has a general understanding of consumers, subsequent articles will continue to unravel the core operation mechanism of consumers, please continue to pay attention to.

Well, this article is introduced here, one key three (follow, like, leave a message) is the biggest encouragement to me.