Hello everyone, I am xiao CAI, a desire to do CAI Not CAI xiao CAI in the Internet industry. Soft but just, soft praise, white piao just! Ghost ~ remember to give me a three – even oh!

This article mainly introduces the introduction of Kafka

Refer to it if necessary

If it is helpful, do not forget the Sunday

Wechat public number has been opened, xiao CAI Liang, did not pay attention to the students remember to pay attention to oh!

The initial Kafka

1, the introduction

Kafka was originally developed by Linkedin as a multi-partitioned, multi-replica, zooKeeper-coordinated distributed messaging system using Scala. Kafka has been donated to the Apache Foundation. At present, Kafka has been positioned as a distributed streaming processing platform, which is widely used for its high throughput, persistence, horizontal expansion, support for streaming data processing and other features.

2. Application scenarios

  • The messaging system:KafkaAnd the traditional message system (message middleware) have system decoupling, redundant storage, traffic peak cutting, buffering, asynchronous communication, scalability, recoverability and other functions. At the same time,KafkaIt also provides the guarantee of message order and the function of backtracking consumption which is difficult for most message systems.
  • The storage system:KafkaPersisting messages to disk effectively reduces the risk of data loss compared to other memory-based storage systems. Thanks to Its message persistence and multi-copy mechanism, Kafka can be used as a long-term data storage system by setting the data retention policy to “permanent” or enabling log compression for topics.
  • Stream processing platform:KafkaNot only does it provide a reliable source of data for every popular streaming framework, but it also provides a complete library of streaming classes for various operations such as Windows, joins, exchanges, and aggregations.

3. Basic concepts

The Kafka architecture consists of producers, brokers, consumers, and a ZooKeeper cluster.

  • ZooKeeperKafka manages cluster metadata, elects controllers, and so on.
  • Producer: producer, the party that sends the message. Responsible for creating messages and then delivering them to Kafka.
  • Consumer: consumer, the party receiving the message. After connecting to Kafka, messages are received and the corresponding business logic is processed.
  • Broker: Service proxy node. For Kafka, the Broker can simply be viewed as a standalone Kafka service node or Kafka service instance. In most cases, you can also think of the Broker as a Kafka server, provided that only one Instance of Kafka is deployed on the server. One or more brokers form a Kafka cluster.

The overall Kafka system is roughly composed of the above parts. In addition, there are two concepts that are particularly important: Topic and Partition

  • Topics: Messages in Kafka are grouped by topic, producers are responsible for sending messages to specific topics (a topic is specified for each message sent to a Kafka cluster), and consumers are responsible for subscribing to and consuming the topics.
  • Partitioning: A topic is a logical concept. It can also be subdivided into multiple partitions, one belonging to a single topic, often referred to as a topic partition (Topic-Partition). Different partitions under the same topic contain different messages, and partitions can be considered as one appending at the storage levelThe log file, messages are assigned a specific offset when appended to partitioned log files (offset).offsetIs the unique identifier of a message in a partition. Kafka uses it to ensure that messages are ordered within a partitionoffsetIt does not span partitions, that is, Kafka guarantees partition order rather than topic order.

Kafka uses the Replica mechanism for zones to increase disaster recovery capability.

Different replicas in the same partition store the same messages (the replicas are not identical at the same time). There is a “one master, many slave” relationship between replicas. The leader copy processes read and write requests, while the follower copy only synchronizes messages with the leader copy. The replica resides in different brokers. If the leader replica fails, a new leader replica is elected from the follower replica to provide services externally.

Kafka implements automatic failover through multiple replicas, ensuring that the service is available when a broker in a Kafka cluster fails.

Before we move on to Kafka, there are a few key words to understand:

  • AR(Assigned Replicas): All copies in a partition are called AR
  • In-sync Replicas (ISR): All replicas (including the leader replicas) that maintain some degree of synchronization with the Leader replicas constitute the ISR. The ISR set is a subset of the AR set. The messages are first sent to the leader copy, and then the follower copy can pull the messages from the leader copy for synchronization. During synchronization, the follower copy will lag behind the leader copy to a certain extent.
  • OSR (Out-of-sync Replicas): The replica (excluding the leader replica) that is too late to synchronize with the leader replica constitutes the OSR

According to the above relationship, we can get a formula: AR=ISR+OSR

  • HW (High Watermark): commonly known as high watermark, is used to identify a specific message offset (offset), consumers can only pull the message before this offset
  • LEO (LogStartOffset): Specifies the offset of the next message to be written

I believe many partners see here a little impatient, this Kafka how so difficult, but also can study hard

Don’t be urgent, theoretical knowledge we still want to go over first, this is not the beginning of the retreat, this is the beginning of your growth! The following dishes strive to use the most simple statement to take you into the deepest pit!

Kafka production team

As we all know, Kafka says that the noble point is a distributed message queue, which is simply not a message queue. Message queues are simply pushing data and getting data. Yes, advanced knowledge often requires simple understanding.

So where does the data come from? The data comes from the production team! From a programming point of view, the production brigade has a group of producers (or just one), which are the applications that send messages to Kafka.

Client development

The production process generally has the following steps to produce:

  • Configure producer client parameters and create producer instances that respond
  • Build the message to be sent
  • Send a message
  • Closing producer instances

Four steps to solve a production problem

As you can see from the code above, we put four parameters into the Properties file:

  • bootstrap.serversThis parameter is used to specify the broker address that producer clients need to connect to a Kafka cluster. The value is in the format of host1: port1, host2: port2. You can set one or more addresses, separated by commas (,). The default value is “.
  • The key. The serializer and value. The serializer: Specifies the serializer for the key and value serialization operations, respectively. These two parameters have no default values and need to be filled in with the fully qualified name of the serializer
  • client.id: Sets the client ID corresponding to KafkaProducer. The default value is “”. If the client is not set, KafkaProducer automatically generates a non-empty string in the form of “producer-1” or “producer-2”, which is the concatenation of the string “producer-” with a number

ProducerRecord is defined as follows:

  • Topic: indicates the topic and partition number of the message to be sent.

  • Headers: indicates the head of a message. This parameter is not required

  • Key: The key used to specify the message, not the additional message to the message, but also to calculate the partition number so that the message can be sent to a specific partition.

  • Value: the body of the message. It is usually not empty. If it is empty, it indicates a specific message, the tombstone message

  • Timestamp: Timestamp of the message. It has two types: CreateTime (when the message was created) and LogAppendTime (when the message was appended to the log file)

The above operations create producer instances and build messages. There are three main modes of sending messages:

  • Fire-and-forget
  • Sync
  • Async

The send-and-forget method we used above is to send a message to Kafka without caring if the message arrived correctly. In most cases, this is fine, but in some cases the message will be lost. Although this mode has the highest performance, it also has the lowest reliability.

public Future<RecordMetadata> send(ProducerRecord<K,V> record) {}
Copy the code

The send method returns a Future object

Future res = producer.send(record);
Copy the code

Send () returns a Future object that allows the caller to retrieve the sent result later. If we want to achieve synchronization, we can call the Future’s get () method directly.

try {
    producer.send(record).get();
} catch (Exception e) {
    e.printStackTrace();
}
Copy the code

The get () method blocks and waits for Kafka’s response until the message is sent successfully or an exception occurs

Can production be asynchronous?

The send() method has another overload in Kafka:

public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {}
Copy the code

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (Objects.isNull(e)) {
            System.out.println("Theme." + recordMetadata.topic());
        } else{ System.out.println(e.getMessage()); }}});Copy the code

The method of using Callback is very straightforward: Kafka will Callback when it receives a response, either successfully sending or throwing an exception.

The two parameters in the onCompletion() method are mutually exclusive; RecordMetadata is not empty if the message is successfully sent, Exception is empty, and vice versa if the message fails.

Have trouble producing?

There are two types of exceptions that generally occur in KafkaProducer:

  • Retried exception

NetworkException, LeaderNotAvailableException, UnknownTopicOrPartitionException,

NotEnoughReplicasException, NotCoordinatorException

  • Unreachable exception

RecordTooLargeException etc.

For retries, you can configure the retries parameter. If an exception is recovered within a specified number of retries, the exception is not thrown. The default value of the retries parameter is 0.

properties.put(ProducerConfig.RETRIES_CONFIG, 10);
Copy the code

In the preceding example, the number of retries is 10. If the number of retries exceeds 10, an exception will be thrown.

Non-retried exceptions, such as RecordTooLargeException, imply that if the sent message is too large, the exception will not be retried and will be thrown.

Serialization to help

Producers need to use Serializer to convert objects into byte arrays before sending them to Kafka over the network. Consumers also need to use Deserializer to convert the byte arrays received in Kafka into corresponding objects.

Used in the code aboveStringSerializerTo achieve theSerializerinterface

The configure () method configures the current class, and the Serizlize () method performs serialization

There needs to be a one-to-one correspondence between the serializers used by producers and the deserializers used by consumers

In addition to the serializers provided by Kafka, we can also customize the serializers:

Student. The class:

@Data
public class Student {

    private String name;

    private String remark;
}
Copy the code

MySerializer:

Use:

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MySerializer.class.getName());
Copy the code

Just put our own serializer in Properties, which is pretty easy!

What’s a divider?

Messages may need to pass through Interceptor, Serializer, and Partitioner on their way to the broker via send().

The interceptor is not required, but the serializer is required. After passing through the serializer, it is necessary to determine the partition to which it is sent. If the partition field is specified in the ProducerRecord message, there is no need for the partition because partition represents the partition number to which it is sent.

package org.apache.kafka.clients.producer;

public interface Partitioner extends Configurable.Closeable {
    int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    void close(a);
}
Copy the code

Partition () is used to calculate the partition number. Partition () returns a value of type int.

  1. Topic: the topic

  2. Key: a key

  3. KeyBytes: serialized key

  4. Value: the value

  5. ValueBytes: serialized value

  6. Cluster: metadata information about a cluster

The main partition allocation logic is defined in the partition() method. If the key is not empty, the default partition divider haxi the key (using MurmurHash2), and finally calculates the partition number based on the resulting hash value. Messages with the same key are written to the same partition. If the key is empty, the message will be polled to the availability zones within the topic.

If the key is not null, the calculated partition number is any of the partitions. If the key is empty, the calculated partition number is only any of the availability zones

Of course, the partition can also be customized, as follows:

MyPartitioner.class:

Use:

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
Copy the code

Custom partitioners are also simple to use and only need to implement the Partitioner interface

Interceptors coming?

For those of you who do web development, you are familiar with interceptors. In Kafka, interceptors also function as interceptors, which are divided into producer interceptors and consumer interceptors

The producer interceptor can be used to do some preparatory work before the message is sent, such as filtering the unqualified message according to a rule, modifying the content of the message, etc., and can also be used to do some custom requirements before sending the callback logic.

If necessary, there will be customizations. To customize the interceptor, we only need to implement the ProducerInterceptor interface:

package org.apache.kafka.clients.producer;

public interface ProducerInterceptor <K.V> extends Configurable {
    ProducerRecord<K,V> onSend(ProducerRecord<K,V> producerRecord);

    void onAcknowledgement(RecordMetadata recordMetadata, Exception e);

    void close(a);
}
Copy the code

Among them, the onSend() method can carry out corresponding customized operations on the message, and the onAcknowledgement() method is called before the message fails to be sent or the message is acknowledged, and takes precedence over the user-set Callback.

Custom interceptors are as follows:MyProducerInterceptor.class:

In the onSend() method, we have modified the messages to be sent, in the onAcknowledgement() method, we have counted the number of successes and failures to be sent, and then in the close() method, we have output the number of successes and failures

Same way to use:

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
Copy the code

Having one interceptor naturally creates a chain of interceptors. We can customize multiple interceptors and declare them in the Properties file:

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor1.class.getName() + "," + MyProducerInterceptor2.class.getName());
Copy the code

In this way, the next interceptor will depend on the output of the previous interceptor

The important parameters

In addition to the parameters already present above, there are some important parameters:

1. ack

This parameter is used to specify how many replicas in the partition must receive the message before the producer considers the message to have been written successfully. There are three types of ack values (String)

  1. Acks = 1: The default value is 1. After the producer sends the message, it will receive a successful response from the server as long as the leader copy of the partition successfully writes the message. If the message is written to the Leader copy and returns a success response to the producer, and the Leader copy crashes before being pulled by another FO llower copy, the message is still lost.

  2. Acks = 0: the producer does not need to wait for any server response after sending the message. If something happens between the time a message is sent and the time it is written to Kafka, Kafka doesn’t receive the message, the producer doesn’t know, and the message is lost. If other configurations are the same, the maximum throughput can be achieved by setting acks to 0.

  3. Acks = -1 or acks = all: The producer must wait for all replicas in the ISR to successfully write the message before receiving a successful response from the server. If other configurations are the same, the acks value is set to 1 or (all) for maximum reliability.

Settings:

The properties. The put (ProducerConfig ACKSCONFIG,"0");// Note that it is a string
Copy the code

2. max.request.size

Used to limit the maximum number of messages that a producer client can send. The default value is 1048576B, or 1MB.

3. retries

Set the number of producer retries. The default value is 0, that is, no retries are performed when an exception occurs.

4. retry.backoff.ms

This parameter is used to set the interval between two retries to avoid invalid frequent retries. The default value is 100

5. connections.max.idle.ms

This parameter is used to specify how long it takes to close restricted connections. The default is 540000(ms), or 9 minutes.

6.buffer.memory

Used to set the buffer size for caching messages

7.batch.size

Used to set the size of the memory area that can be reused

Kafka’s consumer base

There is production and there is consumption, you say! The opposite of a producer is a consumer, and an application can subscribe to a topic via KafkaConsumer and pull messages from the subscribed topic

Individuals and groups?

Each consumer has a corresponding consumer group. The Consumer is responsible for subscribing to topics in Kafka and pulling messages from the subscribed topics. When a message is published to a topic, it is delivered to only one consumer in each consumer group that subscribes to it.

When there is only one consumer in the consumer group, this is the case:

When there are two consumers in the consumer group, this is the case:

As you can see from the above distribution, as the number of consumers increases, the overall spending power can scale horizontally. We can increase (or decrease) the number of consumers to increase (or decrease) the overall spending power. At that time, when the number of zones was fixed, blindly increasing the number of consumers would not improve the consumption ability all the time. If there were too many consumers, the number of consumers would be greater than the number of zones, and some consumers would not be allocated any zones.

Above distribution logic is based on the default partition strategy analysis, can by consumer client configuration partition. The assignment. The strategy to set up the partition allocation between consumers and subscribe to the topic.

Delivery mode

There are two modes of message delivery in Kafka:

  • Point-to-point mode

Queue-based, message producers send messages to queues and message consumers receive messages from queues

  • Publish/subscribe (Pub/Sub)

Topic-based, a topic can be thought of as a mediator for message delivery, with a message publisher publishing messages to a topic from which message subscribers subscribe. Topics enable subscribers and publishers of messages to remain independent from each other, and do not need to contact each other to ensure the transmission of messages. Publish/subscribe mode is adopted in one-to-many broadcast of messages.

Client development

Consumption can be achieved by following steps:

  • Configure the consumer client parameters and create the corresponding consumer instance
  • Subscribe to the topic
  • Pull the message and consume it
  • Commit consumption shift
  • Close the consumer instance

As you can see, when configuring consumer parameters, we see several familiar parameters:

  • bootstrap.serversTo prevent mistakes in writing, can be usedConsumerConfig.BOOTSTRAP_SERVERS_CONFIGSpecifies a list of broker addresses to connect to a Kafka cluster. One or more addresses can be set, separated by commas. The default value is “”.
  • group.idTo prevent mistakes in writing, can be usedConsumerConfig.GROUP_ID_CONFIGRepresents the name of the consumer’s consumer group. The default value is “”. If set to null, an exception will be thrown
  • key.deserializer/value.deserializerTo prevent writing errors, you can useConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIGandConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIGThe deserialization of the response on the consumer end must be the same as that on the production end
  • client.idTo prevent writing errors, you can useConsumerConfig.CLIENT_ID_CONFIGSpecifies the client ID for KafkaConsumer. The default value is “”.

Subscriptions to topics

Consumers consume messages, and it is important to subscribe to the corresponding topics. In the example above we do consumer. Subscribe (Arrays. AsList (topic)); To subscribe to topics, you can see that a consumer can subscribe to one or more topics. Let’s look at the overload of subscribe() :

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { /* compiled code */ }

public void subscribe(Collection<String> topics) { /* compiled code */ }

public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { /* compiled code */ }

public void subscribe(Pattern pattern) { /* compiled code */ }
Copy the code

If the following happens when we subscribe to a topic:

consumer.subscribe(Arrays.asList(topic1));
consumer.subscribe(Arrays.asList(topic2));
Copy the code

So you end up subscribing to topic2, not topic1, and not topic1 and topic2 together.

Subscribe () this method is overloaded to support regular expressions:

Consumer. The subscribe (Pattern.com running (" topic. * "));Copy the code

After this configuration, if someone creates a new topic and the name of the topic matches the regular expression, the consumer can consume messages in the newly added topic.

The subscribe () this method in addition to the theme and regular as a parameter, there are two methods to support the incoming ConsumerRebalanceListener parameters, the listener is used to set up corresponding to equilibrium.

In addition to subscribing to topics through the subscribe() method, consumers can subscribe directly to specific partitions of certain topics through the assign() method.

public void assign(Collection<TopicPartition> partitions)
Copy the code

TopicPartition is defined as follows:

The constructor needs to pass in the topic and partition number of the subscription as follows:

consumer.assign(Arrays.asList(newTopicPartition (kafka - demo ","0)));Copy the code

This way we can subscribe to partition 0 in Kafka-demo.

What if we don’t know in advance how many partitions there are in the topic? The partitionsFor() method in KafkaConsumer can be used to query metadata information for a specific topic. The partitionsFor() method is defined as follows:

public List <PartitionInfo> partitionsFor(String topic);
Copy the code

The PartitionInfo object is defined as follows:

public class Partitioninfo {
    private final String topic;             // The topic name
    private final int partition;            // Partition number
    private final Node leader;              // The location of the leader copy of the partition
    private final Node[] replicas;          // The AR collection of the partition
    private final Node[] inSyncReplicas;    // Partition ISR collection
    private final Node[] offlineReplicas;   // The OSR collection of partitions
}
Copy the code

Subscriptions are not maliciously bundled and can be unsubscribed. We can use the unsubscribe() method in KafkaConsumer to take a de-themed subscription. This method can cancel subscriptions implemented through Subscribe (Collection), Subscribe (Pattem), and assign(Collection).

consumer.unsubscribe() ;
Copy the code

If the Collection argument in subscribe(Collection) or assign(Collection) is set to an empty Collection, it is the same as the unsubscribe() method, and the three lines in the following example have the same effect:

consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>());
consumer.assign(new ArrayList<TopicPartition>());
Copy the code

Consumption patterns

There are generally two modes of message consumption: push mode and pull mode. Consumption in Kafka is based on a pull model

  • Push mode: The server actively pushes messages to consumers
  • Pull mode: The consumer initiates a pull request to the server

In Kafka, message consumption is a polling process. All consumers need to do is call the poll() method repeatedly. If there are no messages available for consumption in a partition, the result of the pull of the corresponding message is empty. If there are no messages available for consumption in any of the subscribed partitions, the poll() method returns an empty set of messages.

public ConsumerRecords<K, V> poll(final Duration timeout)
Copy the code

A timeout can be passed to the poll() method to control how long the poll() method blocks when no data is available in the consumer’s buffer.

throughpoll()The message pulled by the method is oneConsumerRecordObject, defined as follows:

We can directly perform specific business logic processing on the fields of interest in the ConsumerRecord when consuming messages.

Consumer interceptor

We’ve talked about the use of producer interceptors above, and of course, consumers also have the concept of responsive interceptors. Consumer interceptors typically perform some custom operations when consuming a message or submitting a consumption shift.

A producer defines an interceptor by implementing the ProducerInterceptor interface, and a consumer defines an interceptor by implementing the ConsumerInterceptor interface. ConsumerInterceptor defines an interceptor as follows:

package org.apache.kafka.clients.consumer;

public interface ConsumerInterceptor <K.V> extends Configurable.AutoCloseable {
    ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> consumerRecords);

    void onCommit(Map<TopicPartition,OffsetAndMetadata> map);

    void close(a);
}
Copy the code
  • OnConsume () : Before the poll() method returns, KafkaConsumer calls the interceptor’s onConsume() method to customize the message accordingly, such as modifying the content of the returned message and filtering the message according to a rule that may reduce the number of messages returned by the poll() method. If an exception is thrown in the onConsume() method, it is caught and logged, but the exception is no longer passed up.

  • OnCommit () : KafkaConsumer calls the interceptor’s onCommit() method after committing the shift. This method can be used to track the committed shift. For example, if a consumer uses commitSync’s no-parameter method, we do not know the details of the committed shift. This can be done using the interceptor’s onCommit() method.

We used the same method after customizing interceptors:

properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG ,MyConsumerInterceptor.class.getName());
Copy the code

The important parameters

In addition to the parameters already present above, there are some important parameters:

1. fetch.min.bytes

This parameter configures the minimum amount of data that a Consumer can pull from Kafka in a single pull request (calling the poll() method). The default value is 1B. If the amount of data returned is less than the value set for this parameter, it needs to wait until the amount of data meets the configured size for this parameter

2. fetch.max.bytes

This parameter configures the maximum amount of data a Consumer can pull from Kafka in a single pull request. The default value is 52428800 B (50M).

3. fetch.max.wait.ms

This parameter is used to specify the Kafka wait time. The default value is 500 ms

4. max.partition.fetch.bytes

This parameter sets the maximum amount of data to be returned to Consumer from each partition. The default value is 1048576 B (1MB).

5. max.poll.records

This parameter is used to set the maximum number of messages that can be pulled from a Consumer pull request. The default value is 500

6. request.timeout.ms

This parameter is used to set the maximum time for a Consumer to wait for a response. The default value is 30000 ms

Kafka theme management

We’ve seen the concept of themes in both the producer side and the consumer side, and themes are at the heart of Kafka.

A topic, as a categorization of a message, can be subdivided into one or more partitions, which can also be viewed as a secondary categorization of a message. Partitioning not only provides scalability and horizontal scaling for Kafka, but also provides data redundancy for Kafka through the multi-copy mechanism to improve data reliability.

1. Create a theme

On the broker has a configuration parameter for auto. Create. Switchable viewer. The enable (defaults to true), when the parameter to true producers want to a topic has not yet been created when sending a message, Automatically creates a partition number for num. Partitions (defaults to 1), a copy of the factor for the default. The replication. The factor (the default value is 1) theme.

Create using a script:

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic kafka-demo --partitions 4 --replication-factor 2
Copy the code

Create a theme using TopicCommand:

Export Maven dependencies:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>11 kafka_2.</artifactId>
    <version>2.0.0</version>
</dependency>
Copy the code
public static void createTopic(String topicName) {
    String[] options = new String[]{
        "--zookeeper"."localhost:2181/kafka"."--create"."--replication-factor"."2"."--partitions"."4"."--topic", topicName
    };
    kafka.admin.TopicCommand.main(options);
}
Copy the code

In the example above, a topic is created with a partition number of 4 and a replica factor of 2

2. View themes

  • -list:

The list directive allows you to view all currently available topics:

 bin/kafka-topics.sh --zookeeper localhost:2181/kafka -list
Copy the code
  • describe

The Describe directive allows you to view information about individual topics, and if –topic is not applicable to specify a topic, details about all topics are displayed. –topic also supports specifying multiple topics:

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic kafka-demo1,kafka-demo2
Copy the code

3. Modify the theme

After a topic is created, we can make certain changes to it, such as changing the number of partitions, changing the configuration, etc., with the help of the ALTER command:

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic kafka- demo --partitions 3
Copy the code

When modifying a partition, we need to note the following:

When the number of partitions in topic kafka-demo is 1, the message is sent to this partition regardless of the message key value. When the number of partitions increases to 3, the partition number is calculated based on the message key. The message that was sent to partition 0 May now be sent to partition 1 or partition 2. Therefore, it is recommended to set the number of partitions in the beginning.

Kafka currently only supports increase the partition number and not reduce the number of partitions, Kafka – demo when we want to theme the partition number is changed to 1, will quote us InvalidPartitionException anomalies.

4. Delete the theme

If it is determined that a topic is no longer needed, it is best to remove it, which frees up resources such as disks, file handles, and so on. At this point we can use the delete command to delete the topic:

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic kafka-demo
Copy the code

Note that we must set the delete.topic.enable parameter in the broker to true to delete the topic. This parameter is true by default. If false, the deletion of the topic is ignored.

If the theme to be deleted is Kafka’s internal theme, an error is reported when deleting it. For example, __consumer_offsets and __transaction_state

Common parameters

The parameter name paraphrase
alter Used to modify the theme, including the number of partitions and the configuration of the theme
Config < key-value pair > Creates or modifies a theme to set theme-level parameters
create Create a theme
delete Delete the topic
Delete-config < configuration name > Delete configurations overridden at the topic level
describe View details about topics
disable-rack-aware The theme is created without regard to rack information
help Printing Help Information
if-exists Used to modify or delete a topic. The operation is performed only if the topic exists
if-not-exists When creating a theme, the action is performed only if the theme does not exist
list List all available topics
Number of partitions Specify the number of partitions when creating a topic or adding partitions
Replica – Assignment < Allocation scheme > Manually specify the partition copy allocation scheme
Replication-factor < number of copies > Specify a replica factor when creating a topic
Topic < topic name > Specify the topic name
topics-with-overrides When you use Describe to view topic information, only topics containing override configurations are displayed
Specifies the ZooKeeper address information for the connection

Kafka above is roughly the introduction of the content, today’s knowledge is introduced here, although the content is not very deep, but the number of words is also many, can complete the little partner, small dishes to give you a thumbs-up oh!

Today you work harder, tomorrow you will be able to say less words!

I am xiao CAI, a man who studies with you. 💋

Wechat public number has been opened, xiao CAI Liang, did not pay attention to the students remember to pay attention to oh!