The application uses KafkaConsumer to subscribe to a topic for Kafka and receives messages from the subscribed topic. Reading data from Kafka is different from reading data from other breath systems and involves some unique concepts and ideas. It is difficult to understand how to use a consumer API without first understanding these concepts. So let’s explain these important concepts first, and then give a few examples of how different applications can be implemented using consumer apis.

Consumers and consumer groups

Suppose we have an application that needs to read message Wells from a Kafka topic to validate these messages and then save them. The application needs to create a consumer object, subscribe to the topic and start receiving messages, and then verify that the message well holds the results. What happens when, after a while, the producer writes messages to the topic faster than the application can validate the data? If you only use a single consumer to process messages, your application can’t keep up with message generation. Obviously, there is a need to scale horizontally to the consumer. Just as multiple producers can write messages to the same topic, we can also use multiple consumers to read messages from the same topic, diverting messages.

Kafka consumers belong to the consumer group. Consumers in a group subscribe to the same topic, and each consumer receives messages for a subset of the topic.

Assuming topic T1 has 4 partitions, we create consumer C1, which is the only consumer in group G1, and we use it to subscribe to topic T1. Consumer Cl1 receives messages for all four partitions of topic T1, as shown in Figure 4-1.

If group G1 has four consumers, each consumer can be assigned a partition, as shown in Figure 4-3.

Adding consumers to a group is the main way to horizontally scale spending power. Kafka consumers often perform operations with high latency, such as writing data to a database or HDFS, or using the data for time-consuming calculations. In these cases, a single consumer can’t keep up with the data generation, so you can add more consumers and let them share the load, with each consumer processing only part of the partition’s message, which is a major means of horizontal scaling. It is necessary to create a large number of partitions for the theme to add more consumers as the load grows. However, be careful not to let the number of consumers exceed the number of theme partitions, the extra consumers will only be idle.

In addition to scaling a single application horizontally by adding consumers, it is common for multiple applications to read data from the same topic. In fact, one of the main goals of Kafka’s design is to make the data in Kafka themes suitable for various application scenarios in the enterprise. In these scenarios, each application can access all of the messages, not just some of them. As long as you make sure that each application has its own consumer group, you can let them get all the messages on the topic. Unlike traditional messaging systems, scaling Kafka consumers and consumer groups horizontally does not have a negative impact on performance.

In the example above, if you add a new group G2 that contains only one consumer, the consumer will receive all messages from topic T1, with no interaction between the consumer and group G1. Group G2 can add more consumers, and each consumer can consume several partitions, as shown in Figure 4-5. In general, group G2 will receive all messages regardless of the presence or absence of other groups.

In short, create a consumer group for each application that needs to get all the messages for one or more topics, and then add consumers to the group to scale the reading power and processing power so that each consumer in the group processes only a portion of the messages.

Rebalancing consumer groups and partitions

As we’ve seen in the previous section, the consumers in the group collectively read the topic’s partitions. When a new consumer is added to the group, it reads messages that were originally read by other consumers. When a consumer is shut down or crashes, it leaves the group, and the partitions it was reading will be read by other consumers in the group. Partition redistribution occurs when the theme changes, such as when an administrator adds a new partition.

The transfer of ownership of a partition from one consumer to another is called rebalancing. Rebalancing is important because it brings high availability and scalability to the consumer group (we can add or remove consumers with confidence), but in normal circumstances we don’t want this to happen. During rebalancing, consumers were unable to read messages, making the entire group unavailable for a short period of time. In addition, when the partition is reassigned to another consumer, the consumer’s current read state is lost and it may need to refresh the cache, slowing down the application before it can regain its state. In this chapter we will discuss how to rebalance safely and how to avoid unnecessary rebalancing.

Consumers maintain their affiliation to groups and their ownership of partitions by sending heartbeats to brokers designated as group coordinators (different groups can have different coordinators). As long as the consumer sends a heartbeat at a normal interval, it is considered active, indicating that it is still reading messages in the partition. The consumer sends the heartbeat when polling the message (to get the message) or when submitting the offset. If the consumer stops sending a heartbeat long enough, the session expires, the group coordinator considers it dead, and a rebalancing is triggered.

If a consumer crashes and the well stops reading messages, the group broker waits a few seconds to confirm that the consumer is dead before triggering rebalancing. During these few seconds, the dead consumer does not read messages in the partition. When the consumer is cleaned up, the consumer notifies the coordinator that it is leaving the group, and the coordinator immediately triggers a rebalancing to minimize processing pauses. Later in this chapter, we’ll discuss some of the configuration parameters used to control the sending heartbeat rate and session expiration time, and how to configure these parameters based on actual needs.

What is the process of allocating partitions

When a consumer wants to join a group, it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the “group master.” The group master gets a list of the group’s membership columns from the coordinator (the list contains all consumers who have recently sent a heartbeat and are considered active) and is responsible for assigning partitions to each consumer. It uses a class that implements the PartitionAssignor interface to determine which partitions should be assigned to which consumers.

Kafka has two built-in allocation policies, which we’ll discuss in more detail in the configuration parameters section. After the assignment is complete, the group master sends the list of assignments to the group coordinator, who in turn sends the information to all consumers. Each consumer can only see its own distribution information, and only the group owner knows the distribution information of all consumers in the group. This process will repeat with each rebalancing.

Create a Kafka consumer

Before you can read a message, you need to create a KafkaConsumer object. Creating a KafkaConsumer object is very similar to creating a KafkaProducer object — put the Properties you want to pass to the consumer in the Properties object. All of these attributes are discussed in depth later in this chapter. In this case, we only need to use the three necessary attributes: bootstrap.Servers, key.deserializer, and value.deserializer.

The following code demonstrates how to create a KafkaConsumer object:

Properties props = new Properties();
 
props.put("bootstrap.servers"."broker1:9092, broker2:9092");
 
props.put("group.id"."CountryCounter");
 
props.put("key.deserializer"."org.apache.kafka.common.serializaiton.StrignDeserializer");
 
props.put("value.deserializer"."org.apache.kafka.common.serializaiton.StrignDeserializer");
 
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
Copy the code

Deserializer uses the specified class (deserializer) to convert byte arrays into Java objects.

Group. id specifies which consumer group KafkaConsumer belongs to. Group-id is not required, but let’s assume it is for now. It specifies which consumer group KafkaConsumer belongs to. It’s also possible, though less common, to create consumers who don’t belong to any one group.

Subscribe to the topic

With the consumer created, the next step is to start subscribing to the topic. The subscribe() method takes a list of topics as a parameter

consumer.subscribe(Collections.singletonList("customerCountries"));
Copy the code

Here we create a list of single elements, the topic name is called “customerCountries.” We can also pass in a regular expression when we call subscribe(), and the regular expression can match multiple topics. If someone creates a new topic and the topic name matches the regular expression, A rebalancing is triggered immediately and the consumer can read the newly added topic. This subscription works well if your application needs to read multiple topics and can handle different types of data. It is common practice to subscribe to multiple topics using regular expressions when replicating data between Kafka and other systems.

To subscribe to all test-related topics, do this: consumer.subscribe(“test.*”);

polling

Message polling is at the heart of the consumer API, requesting data from the server through a simple poll. Once the consumer has subscribed to the topic, the polling takes care of all the details, including group coordination, partition rebalancing, sending heartbeat, and fetching data, and the developer only needs to use a simple set of apis to process the data returned from the partition. The main part of the consumer code looks like this:

Thread safety

In the same group, we cannot have one thread running multiple consumers, nor can we have multiple threads safely sharing a consumer. As a rule, one consumer uses one thread. If you want to run multiple consumers in the same consumer group, you need to have each consumer run in its own thread. It’s better to encapsulate the consumer’s logic in its own object, and then use Java’s ExecutorService to start multiple threads, with each consumer running on its own thread.

Consumer configuration

So far, we’ve learned how to use the consumer API, but only introduced a few configuration categories such as bootstrap.Servers, key.deserializer, Value.deserializer, and groue.id. Kafka’s documentation lists all consumer-related configuration instructions. Most of the parameters have reasonable defaults and generally don’t need to be changed, but there are some parameters that have a lot to do with consumer performance and availability. These important properties are described next.

1. fetch.min.bytes

This property specifies the minimum number of bytes for a consumer to retrieve a record from the server. When the broker receives a request for data from a consumer, if the amount of data available is smaller than the size specified by the fetch. Min.bytes, it waits until sufficient data is available before returning it to the consumer. This reduces the workload on the consumer and broker because they do not need to process messages back and forth when the topic is not very active (or during the low points of the day). If there is not a lot of data available, but consumer CPU usage is high, you need to set this property to a value larger than the default. If the number of consumers is large, setting this property to a larger value can reduce the workload on the broker.

2. fetch.max.wait.ms

We use the fetch. Min. bytes to tell Kafka to wait until we have enough data to return it to the consumer. Fetch. Max.wait. ms specifies the waiting time for the broker, which is 500ms by default. If there is not enough data flowing into Kafka, the consumer’s requirement for the minimum amount of data is not met, resulting in a delay of 500ms. If you want to reduce the potential latency (in order to meet the SLA), you can set this parameter to a smaller value. If the fetch.max.wait.ms is set to 100ms and the fetch.min.bytes is set to 1MB, then Kafka will either return 1MB of data after receiving the consumer request or return all available data after 100ms. It depends on which condition is met first.

3. max.parition.fetch.bytes

This property specifies the maximum number of bytes the server can return to the consumer from each partition. The default value is 1MB, which means that the kafkaconSumer.poll () method returns no more than the number of bytes from each partition specified by max.parition. Fetch. Bytes. If a topic has 20 partitions and five consumers, then each consumer needs at least 4MB of available memory to receive records. When allocating memory to consumers, you can allocate more to them because if a consumer in the group crashes, the remaining consumers need to deal with more partitions. The value of max.parition. Fetch. Bytes must be larger than the maximum number of bytes of messages that the broker can receive (via the max.message.size generic configuration) or consumers may not be able to read these messages, causing consumers to hang and retry. Another factor to consider when setting this property is the time the consumer takes to process the data. The consumer needs to call the poll() method frequently to avoid session expiration and partition rebalancing. If a single call to poll() returns too much data, the consumer needs more time to process it and may not be able to do the next poll in time to avoid session expiration. If this happens, you can either change the max.parition. Fetch. Bytes value to a smaller value or extend the session expiration time.

4. session.timeout.ms

This property specifies how long a consumer can be disconnected from the server before it is presumed to be dead. The default is 3s. If the consumer does not send a heartbeat to the group coordinator within the time specified by session.timeout.ms, it is considered dead, and the coordinator triggers rebalancing, allocating its partitions to other consumers in the group. This property is closely related to heartbeat.interval.ms. Heartbeat.interval.ms specifies the rate at which the poll() method sends heartbeats to the coordinator, and session.timeout.ms specifies how long the consumer can stop sending heartbeats. Ms must be smaller than session.timeout.ms, which is usually one third of session.timeout.ms. If session.timeout.ms is 3s, heartbeat.interval. Ms should be ls. Setting session.timeout.ms to a smaller value than the default can detect and recover crashed nodes more quickly, but prolonged polling or garbage collection can lead to unexpected rebalancing. Setting this property to a larger value reduces unexpected rebalancing, but it takes longer to detect node crashes.

5. auto.offset.reset

This property specifies what to do if the consumer reads a partition with no offset or if the offset is invalid (the record containing the offset has been deleted from the well because the consumer has been inactive for a long time). Its default value is latest, which means that in the case of invalid offsets, the consumer will start reading from the latest record (the record generated after the consumer started). The other value is “earliest”, meaning that the consumer will read the partition’s records from the starting position if the offset is invalid.

6. enable.auto.commit

We’ll look at several different ways to commit offsets later. This property specifies whether the consumer automatically submits the offset. The default value is true. To minimize duplication and data loss, you can set it to false and control when the offset is committed. If set to true, you can also control the frequency of submissions by configuring the auto.mit.interval. MLS property.

7. partition.assignment.strategy

As we know, partitions are assigned to consumers in the group. The PartitionAssignor determines which partitions should be assigned to which consumers, based on a given consumer and topic. Kafka has two default allocation policies.

– Range

This strategy assigns consecutive partitions of the topic to consumers. Assume that customer C1 and consumer C2 subscribe to both topic T1 and topic T2, and each topic has three partitions. Then it is possible for consumer C1 to be assigned to partition 0 and partition 1 of the two topics, and consumer C2 to be assigned to partition 2 of the two topics. Because each topic has an odd number of partitions, and assignments are done independently within the topic, the first consumer ends up with more partitions than the second consumer. This happens whenever the Range strategy is used and the number of partitions is not divisible by the number of consumers.

– RoundRobin

This policy assigns all partitions of the topic to consumers individually. If the RoundRobin strategy was used to assign partitions to consumer C1 and consumer C2, then consumer C1 would be assigned to zones 0 and 2 of topic T1 and zone 1 of topic T2, Consumer C2 will be assigned to partition L of topic T1 and partitions 0 and 2 of topic T2. In general, if all consumers subscribe to the same topic (which is common), the RoundRobin policy assigns all consumers the same number of partitions (or at most one partition less).

You can set partition. The assignment. The strategy to select the partition strategy. Default is org. Apache. Kafka. Clients. Consumer. RangeAssignor, this class implements a Range strategy, But also can change it to org. Apache. Kafka. Clients. Consumer. RoundRobinAssignor. Set just strategy, and we can also be used in this case, the partition. The assignment. The strategy the value of the attribute is the name of the custom class.

8. client.id

This property, which can be any string, is used by the broker to identify messages sent from the client and is commonly used in logging, metrics, and quotas.

9. max.poll.records

This property controls the number of records that can be returned in a single call to the call() method, which helps you control the amount of data that needs to be processed in polling.

. 10. The receive buffer, bytes and the send buffer. The bytes

You can also set the size of the TCP buffer used by the socket to read and write data. If they are set to -1, the operating system default is used. These values can be appropriately increased if the producer or consumer and broker are in different data centers, since networks across data centers tend to have higher latency and lower bandwidth.

Author’s note: Welcome to pay attention to the author’s public number, regularly share IT Internet, finance and other work experience, life insights, welcome to exchange, currently working in Ali – mobile division, need to push in the factory can also go to the public number to hit resume, or view my personal information to obtain. (Public ID: WEknow619).