One, foreword

Kafka: Producer Metadata can read and update Kafka: Producer Metadata. The last article introduced the macro model of Producer from a macro point of view. There are many things about obtaining topic metadata information through waitOnMetadata() method, so it is a separate article.

Second, the Metadata

2.1 What is Metadata

Metadata refers to the Metadata of a Kafka cluster. It contains information about a Kafka cluster.

public class Metadata implements Closeable {
    private final Logger log;
    // retry.backoff.ms: the default value is 100ms. It is used to set the interval between two retries to avoid frequent and invalid retries.
    private final long refreshBackoffMs;
    // metadata.max.age.ms: The default value is 300000, and metadata will be forcibly updated if it is not updated within this time.
    private final long metadataExpireMs;
    // Update version number. For each update, version increases by 1 to determine whether metadata is updated
    private int updateVersion;
    // Request version number, which increases by 1 each time a request is sent
    private int requestVersion;
    // Time of last update (including update failure)
    private long lastRefreshMs;
    // When the last update was successful
    private long lastSuccessfulRefreshMs;
    private KafkaException fatalException;
    // Illegal topics
    private Set<String> invalidTopics;
    // Unauthenticated topics
    private Set<String> unauthorizedTopics;
    // Cache of metadata information
    private MetadataCache cache = MetadataCache.empty();
    private boolean needFullUpdate;
    private boolean needPartialUpdate;
    // The metadata Updates Listener list is received
    private final ClusterResourceListeners clusterResourceListeners;
    private boolean isClosed;
    // The last leaderEpoch for a storage Partition
    private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
}
Copy the code

MetadataCache: Information about nodes, topics, and partitions in a Kafka cluster. (Read only)

public class MetadataCache {
    private final String clusterId;
    private final Map<Integer, Node> nodes;
    private final Set<String> unauthorizedTopics;
    private final Set<String> invalidTopics;
    private final Set<String> internalTopics;
    private final Node controller;
    private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
    private Cluster clusterInstance;
}
Copy the code

Topic details (leader node, Replica node, ISR list) are stored in the Cluster instance.

// Saves information about nodes, Topics, and partitions in the Kafka cluster
public final class Cluster {
    private final boolean isBootstrapConfigured;
    / / the node list
    private final List<Node> nodes;
    // Unauthenticated topics
    private final Set<String> unauthorizedTopics;
    // Illegal topics
    private final Set<String> invalidTopics;
    // Kafka built-in topics
    private final Set<String> internalTopics;
    private final Node controller;
    // Partition information, such as the node where the leader resides, all copies, copies in ISR, and copies in offline
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    // The mapping between topic and partition information
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    // Mapping between topic and available partition(leader is not null)
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    // Mapping between node and partition information
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    // Mapping between node ids and nodes
    private final Map<Integer, Node> nodesById;
    // Cluster information, which contains only one clusterId
    private final ClusterResource clusterResource;
}
Copy the code
// topic-partition: contains topic, partition, leader, replicas, and ISR
public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;
}
Copy the code

Read the source code is not difficult to understand the main data structure of Metadata, we summarize the information contained in the following:

  • Which nodes are in the cluster;
  • What are the topics in the cluster and what are the partitions of these topics?
  • Which node is the leader copy assigned to and which node is the follower copy assigned to each partition?
  • What are the copies of the AR and ISR of each partition?

2.2 Application Scenarios of Metadata

Metadata is very important in Kafka. There are many scenarios that require Metadata to fetch or update data. For example:

  • When KafkaProducer sends a message to a topic, it needs to know the number of partitions, the target partition to send, the leader of the target partition, and the address of the node where the leader is located. This information is obtained from Metadata.
  • When leader elections occur in a Kafka cluster, partitions or replicas of nodes change, data in the Metadata must be updated.

Iii. Metadata updating process of Producer

When Producer calls the doSend() method, the first step is to retrieve the metadata information for the topic through the waitOnMetadata method.

To summarize the above code:

  • First, the cluster information is obtained from the cache and the partition information is obtained. If it can be obtained, the current cluster information is returned. If it does not contain the required partition information, the metadata is updated.
  • The metadata update operation is performed in a do…. The while loop does the following until the metadata contains information about the desired partition:
    • Call metadata. RequestUpdateForTopic () method to get updateVersion, namely the last updated version, and will be needUpdate is set to true, forcing update;
    • Call sender.wakeup() to wakeup the sender thread, which in turn wakes up the NetworkClient thread, which operates on the UpdateMetadataRequest request, More on that later;
    • Call the metadata.awaitUpdate(version, remainingWaitMs) method to wait for metadata to update, Compare the current updateVersion with the updateVersion obtained in Step 1 to determine whether the update is successful.

3.1 org.apache.kafka.clients.Net workClient# poll

Call sender.wakeup() to wakeup the sender thread, which in turn wakes up the NetworkClient thread, which operates on the UpdateMetadataRequest request. The real process in NetworkClient is the networkClient.poll () method. Let’s look at the source code to see how NetworkClient handles the request.

3.2 org.apache.kafka.clients.Net workClient. DefaultMetadataUpdater# maybeUpdate (long)

Let’s see how metadata is updatedHere you may ask, Cho, what is the minimum load node?

Don’t worry, let’s look at the picture below and you’ll get the idea.

LeastLoadedNode refers to the least-loaded Node in a Kafka cluster. LeastLoadedNode is determined by the number of undetermined requests that each Node contains in its InFlightRequests. The fewer undetermined requests that each Node contains, the smaller the load. As shown in the figure above, Node1 is LeastLoadedNode.

3.3 org. Apache. Kafka. Clients. Metadata# updateRequested Next metadata update time:The current metadata is about to expireNamely timeToExpire andDistance from the time when metadata information is allowed to be updatedThe maximum value in timeToAllowUpdate.

TimeToExpire: needUpdate is true, which indicates mandatory update. In this case, the value is 0. Otherwise, the metadata information is updated periodically based on the periodic update time, that is, the expiration time of the metadata information (the default value is 300,000 ms, that is, 5 minutes).

TimeToAllowUpdate: Default is the default value of refreshBackoffMs, which is 100 ms.

3.4 org.apache.kafka.clients.Net workClient. DefaultMetadataUpdater# maybeUpdate (long, org.apache.kafka.com mon. Node)

Let’s continue with the maybeUpdate method:

Therefore, each time producer requests to update metadata, the following situations occur:

  • The channel is ready for Node to send the request, so send the request directly.
  • Returns if the node is making a connection.
  • If the Node is not already connected, initiate the connection to the broker.

The KafkaProducer thread blocks in two while loops until metadata is updated:

  • The sender thread first calls poll to initialize the connection to node.
  • The sender thread calls poll a second time, sending the metadata request.
  • The sender thread calls poll a third time, gets the metadataResponse, and updates the metadata.

3.5 Receives the response from the Server and updates the Metadata

How handleCompletedReceives any completed received response is as follows:

Then call handleSuccessfulResponse.

Four,

Metadata is updated in two ways:

  • Force update: Call metadata.requestUpdate () to set needFullUpdate to true to force the update.
  • Periodic updates: Metadata is updated by lastSuccessfulRefreshMs and metadataExpireMs. The default period is 5 minutes metadataExpireMs.

Both update mechanisms are checked when the Poll () method of NetworkClient is called, and the update operation is triggered when either of them is reached.

Forcible Metadata updates occur in one of the following situations:

  • When the initConnect method is called, the connection is initialized;
  • The handleDisconnections() method is called in the poll() method to handle a disconnection, which triggers a forced update;
  • HandleTimedOutRequests () in the poll() method when requests time out;
  • If the leader of the partition cannot be found during message sending.
  • Handle the Producer response (handleProduceResponse) if an exception is returned indicating that the Metadata has expired, for example, there is no meta associated with a topicpartition or the client does not have permission to obtain its Metadata.

Forced updates are mainly used to handle various exceptions.

Well, that’s it for reading and updating Metadata. See you next time.