All the commands in this article have been verified by the blogger to ensure accuracy. Non-copy paste piece together article; If you want to learn more about tool commands, you can leave a comment in the comments section, and bloggers will add them at a later date.

The blogger is serial Kafka source code, Kafka operation and maintenance, Kafka practice series of articles and related articles will be matched to record video this article is the first column welcome to pay attention to < Shi Zhenzhen’s store > don’t get lost!!

Most of the following o&M operations can be performedLogI-Kafka-ManagerVisually operate on the platform;

@[TOC]

1.TopicCommand

1.1. The Topic created

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test


This parameter is optional

parameter describe example
--bootstrap-server Specify the Kafka service Specifies the Kafka service to connect to. If there is this parameter, then--zookeeperYou don’t have to –bootstrap-server localhost:9092
--zookeeper Deprecated, connected to the Kafka cluster through zK connection mode; — Zookeeper localhost:2181 or localhost:2181/kafka
--replication-factor The number of copies should not be greater than the number of brokers. If it is not provided, the default configuration in the cluster is used –replication-factor 3
--partitions Number of partitions. This is used to specify the number of partitions when creating or modifying a topic. If no parameter is provided at creation time, the default value in the cluster is used. Note that if the partition is smaller than before, it will be a problem –partitions 3
--replica-assignment Copy partition allocation mode; When creating a topic, you can specify the replica allocation; --replica-assignmentBrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; This means that there are three partitions and three copies, corresponding to the assigned Broker; Comma (,) separates identification partitions; Colons separate copies
--config <String: name=value> Set topic-level configuration to override the default configuration;This function only works when –create and –bootstrap-server are used together; Please refer to the attachment at the end of the document for a list of parameters that can be configured For example, overwrite two configurations--config retention.bytes=123455 --config retention.ms=600001
--command-config<String: command file path > Admin Client startup configuration,This parameter is valid only when –bootstrap-server is used simultaneously; For example, set the timeout period of the request--command-config config/producer.proterties ; Then configure request.timeout.ms=300000 in the file

1.2. Delete the Topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test


To support regular expressions that match a Topic for deletion, just wrap the Topic in double quotes. For example, delete a Topic starting with create_topic_byhand_zk.

bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic “create_topic_byhand_zk.*”

. Matches any single character except the newline \n. To match a., use a.

·*· : Matches the previous subexpression zero or more times. To match the * character, use *. .* : Any character

Delete any Topic (with caution)

bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic “.*?”

See regular expressions for more usage

1.3. Expansion of a Topic Partition

Zk (not recommended)

>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

Kafka version >= 2.2 The following methods are supported (recommended)

Expanding capacity for a Single Topic

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4

Batch expansion (increase the number of Topic partitions matched by all regular expressions to 4)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4

“. *?” Regular expression means to match all; You can match on demand

PS: If a Topic has fewer than the specified number of partitions, it throws an exception. However, it will not affect the normal operation of other topics.


This parameter is optional

parameter describe example
--replica-assignment Copy partition allocation mode; When creating a topic, you can specify the replica allocation; --replica-assignmentBrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; This means that there are three partitions and three copies, corresponding to the assigned Broker; Comma (,) separates identification partitions; Colons separate copies

PS: Although this configuration is all partition copy allocation configuration, but is taking effect is the new partition; For example, the previous three partitions and one copy were like this

Broker-1 Broker-2 Broker-3 Broker-4
0 1 2

Replica-assignment 2,1,3,4; Change Broker from partition 0 to partition 1

Broker-1 Broker-2 Broker-3 Broker-4
1 0 2 3

But it doesn’t actually do that, the Controller will actually cut off the first three as it processes them; Only the new partition allocation mode, the original is still unchanged

Broker-1 Broker-2 Broker-3 Broker-4
0 1 2 3

1.4. Querying Topic Description

1. Query a single Topic

sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal

2. Batch query Topic(regular expression match, below is query all Topic)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server xxxx:9092 --describe --exclude-internal

To support regular expressions to match Topic, you simply wrap the Topic in double quotes


This parameter is optional

parameter describe example
--bootstrap-server Specify the Kafka service Specifies the Kafka service to connect to. If there is this parameter, then--zookeeperYou don’t have to –bootstrap-server localhost:9092
--at-min-isr-partitions Some statistics and configuration information are omitted during the query --at-min-isr-partitions
--exclude-internal Exclude kafka internal topics such as__consumer_offsets-* --exclude-internal
--topics-with-overrides Only topics with overridden configurations are displayed, i.e. configurations that are specific to Topic Settings overwrite the default configuration; Partition information is not displayed --topics-with-overrides

5. Query the Topic list

1. Query the list of all topics

sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal

2. Query the list of matching topics (regular expression)

Sh bin/kafka-topics. Sh –bootstrap-server XXXXXX :9092 –list — exclud-internal — Topic “test_create_.*”


This parameter is optional

parameter describe example
--exclude-internal Exclude kafka internal topics such as__consumer_offsets-* --exclude-internal
--topic You can match with a regular expression to show the topic name --topic

2.ConfigCommand

Config related operations; Dynamic configuration can override the default static configuration.

2.1 Querying configuration information

Topic Configuration Query

Shows dynamic and static configurations for Topic

1. Querying the configuration of a single Topic (dynamic configuration only)

Sh –describe –bootstrap-server XXXXX :9092 –topic test_create_topic or sh bin/kafka-configs.sh –describe –bootstrap-server XXXXX :9092 –topic test_create_topic –describe –bootstrap-server 172.23.248.85:9092 –entity-type topics –entity-name test_create_topic

2. Query all Topic configurations (including internal topics)(only dynamic configurations are listed)

Sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics

3. Query detailed Topic configuration (dynamic + static)

You just need to add one argument –all

Other configuration/clients/users/brokers/broker – loggers queries

In the same way; Only need to change – the entity – type to correspond to the type of line (switchable viewer/clients/users/brokers/broker – loggers)

Example Query the kafka version

sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version

See the bottom for all configurable dynamic configurationsThe attachmentPart of the

2.2 Adding, Deleting, and modifying configurations--alter

–alter

–delete-config k1=v1,k2=v2 –delete-config k1=v1,k2=v2 – the entity -type (switchable viewer/clients/users/brokers/broker – loggers) type names: – the entity – the name

Topic Adds/modifies dynamic configuration

--add-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999

Deleting a Topic dynamic configuration

--delete-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms

The same applies to other configurations. You only need to change the type--entity-type

Type: (switchable viewer/clients/users/brokers/broker – loggers)

See the attachment at the back to see what configurations can be modified: Some optional configurations for ConfigCommand

3. Copy expansion, partition migration, and cross-path migration of Kafka-reassign partitions

[Kafka operation and maintenance] copy expansion, data migration, copy redistribution, copy across the path migration (if the point does not come out, said that the article has not been published, please wait)

4. Send kafka-console-producer.sh for the Topic

4.1 No Key Message is displayed in Production

## producers
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

Copy the code

4.2 Producing a key message with an attribute –property parse.key=true

## producers
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties  --property parse.key=true

Copy the code

By default, message key and message value are separated by “Tab”, so do not use escape characters (\t) in message key and value.


Optional parameters

parameter Value types instructions Valid values
–bootstrap-server String The server to connect to is required (unless specified –broker list) Such as: host1: prot1, host2: prot2
–topic String The name of the subject (required) to receive the message
–batch-size Integer The number of messages sent in a single batch 200(default)
–compression-codec String Compress the codec None, gzip(default)snappy, LZ4, and ZSTD
–max-block-ms Long Maximum length of time that the producer will block during the sending of a request 60000(default)
–max-memory-bytes Long The producer buffers the total memory waiting to be sent to the server 33554432(default)
–max-partition-memory-bytes Long The buffer size allocated for the partition 16384
–message-send-max-retries Integer Maximum number of retry times 3
–metadata-expiry-ms Long Time threshold for forcibly updating metadata (MS) 300000
–producer-property String A mechanism for passing custom properties to the generator Such as: key = value
–producer.config String Producer configuration property file [–producer-property] takes precedence over the full path of the configuration file
–property String Custom message readers parse.key=true/false key.separator=<key.separator>ignore.error=true/false
–request-required-acks String Confirmation mode of a producer request 0, 1(default), and all
–request-timeout-ms Integer The acknowledgement timeout period requested by the producer 1500(default)
–retry-backoff-ms Integer The wait time threshold for refreshing metadata before the producer tries again 100(default)
–socket-buffer-size Integer TCP receive buffer size 102400(default)
–timeout Integer The time threshold for asynchronous processing of message queues 1000(default)
–sync Synchronous sending of messages
–version The Kafka version is displayed If other parameters do not match, the local Kafka version is displayed
–help Printing Help Information

5. Topic consumption kafka-console-consumer.sh

No client name is specified below, so each execution will be a new client consuming from scratch

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

2. The regular expression matches the topic to consume — Whitelist consumes all the topics

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –whitelist ‘.*’

Consume all topics and consume them from scratch

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –whitelist ‘.*’ –from-beginning

3. Display the key for consumption--property print.key=true

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –property print.key=true

4. Specify partition consumption--partitionSpecifies the start offset consumption--offset

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –partition 0 –offset 100

5. Name the client--group

Note that after naming the client, the — froth-beginning will no longer consume from the beginning if there has been previous consumption

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –group test-group

6. Add client properties--consumer-property

This parameter can also be used to add properties to the client, but do not configure the same property in multiple places. They are mutually exclusive. For example, adding the attribute group test-group on top of that doesn’t work

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test --consumer-property group.id=test-consumer-group

7. Add client properties--consumer.config

–consumer-property takes precedence over –consumer.config. This is the same as –consumer-property

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –consumer.config config/consumer.properties


parameter describe example
--group Specifies the ID of the group to which the consumer belongs
--topic The topic being consumed
--partition Specify partition; Unless specified- offsetOtherwise, consumption starts from the end of the partition (latest) --partition 0
--offset The starting offset position at which consumption is performed; Default value: latest; /latest /earliest /earliest /earliest --offset 10
--whitelist Regular expressions match topic;--topicI don’t have to specify it; All matched topics will be consumed; Of course with this parameter,--partition --offsetIt can’t be used
--consumer-property Pass the user-defined attribute to the consumer in the form of key=value --consumer-property group.id=test-consumer-group
--consumer.config Consumer configuration properties file please note that [consumer-property] takes precedence over this configuration --consumer.config config/consumer.properties
--property Initializes the properties of the message formatter Print. Timestamp =true,false, print.key=true,false, print.value=true,false, key.separator=<key.separator> Separator =<line.separator>, key.deserializer=<key.deserializer>, value.deserializer=<value. Deserializer >
--from-beginning Start with the earliest message that exists, not the latest, and note that if the client name is configured and consumed before, it will not be consumed from scratch
--max-messages The maximum amount of data to consume. If not specified, the consumption continues --max-messages 100
--skip-message-on-error If there is an error while processing the message, skip it instead of pausing
--isolation-level Set to READ_COMMITTED to filter out uncommitted transactional messages, set to READ_UNcommitted to read all messages, default: read_UNcommitted
--formatter Kafka. Tools. DefaultMessageFormatter, kafka. Tools. LoggingMessageFormatter, kafka. View NoOpMessageFormatter, kafka. View Che cksumMessageFormatter

6. The kafka-leader-election leader is elected again

6.1 Specifying a Topic To specify a partitionPREFERRED: PREFERRED replica policyThe Leader reelection is performed


> sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0

Copy the code

6.2 Re-use all partitions for all topicsPREFERRED: PREFERRED replica policyThe Leader reelection is performed

sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred  --all-topic-partitions

Copy the code

6.3 Setting the Configuration File The Leader election for a specific topic and zone in a batch is performed

The leader-election.json file is configured


{
  "partitions": [{"topic": "test_create_topic4"."partition": 1
    },
    {
      "topic": "test_create_topic4"."partition": 2}}]Copy the code

 sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred  --path-to-json-file config/leader-election.json
 
Copy the code

This parameter is optional

parameter describe example
--bootstrap-server Specify the Kafka service Specifies the Kafka service to connect to –bootstrap-server localhost:9092
--topic Specifies Topic. This parameter is the same as--all-topic-partitionsandpath-to-json-fileThe three mutually exclusive
--partition Specify partition, and--topicCollocation is used
--election-type Two electoral strategies (PREFERRED: The first copy election will fail if the first copy is not online.UNCLEANStrategy:)
--all-topic-partitions All topics and partitions are reelected as leaders. This parameter with--topicandpath-to-json-fileThe three mutually exclusive
--path-to-json-file Configuration file batch election, this parameter is the same as--topicandall-topic-partitionsThe three mutually exclusive

7. Push messages kafka-verifiable-producer.sh continuously

Send 100 messages at a time--max-messages 100

The default value is -1. -1 indicates that the process will be pushed until it is closed

sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092 --max-messages 100

The maximum throughput per second for sending messages does not exceed--throughput 100

Throughput when pushing messages, in units of messages/ SEC. The default value is -1, indicating no limit

sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092 --throughput 100

The body of the sent message is prefixed--value-prefix

sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092 --value-prefix 666

Note that –value-prefix 666 must be an integer and the body of the message sent is formatted with a dot. For example: 666.

Config CONFIG_FILE Specifies the configuration file of the producer. –acks Acks Ack value of each push message. The default value is -1

8. Keep batch pulling kafka-verifiable-consumer messages

Continue to consumption

sh bin/kafka-verifiable-consumer.sh –group-id test_consumer –bootstrap-server localhost:9092 –topic test_create_topic4

Maximum consumption of 10 messages per time--max-messages 10

sh bin/kafka-verifiable-consumer.sh –group-id test_consumer –bootstrap-server localhost:9092 –topic test_create_topic4 --max-messages 10


This parameter is optional

parameter describe example
--bootstrap-server Specify the Kafka service Specifies the Kafka service to connect to. –bootstrap-server localhost:9092
--topic Specifies the topic to consume
--group-id Consumer ID; If not specified, the group id is new every time
group-instance-id Consumer group instance ID, a unique value
--max-messages Maximum number of messages consumed at a single time
--enable-autocommit Whether to enable the offset automatic commit function. The default is false
--reset-policy Select the policy to pull offset when there is no previous consumption recordearliest.latest.none. The default is the earliest
--assignment-strategy Consumer Partition policy. Default isorg.apache.kafka.clients.consumer.RangeAssignor
--consumer.config Specifies the consumer profile

9. Perform the producer pressure test kafka-producer-perf-test.sh

1. Send 1024 messages--num-records 100And each message is 1KB in size--record-size 1024The maximum throughput is 10000 per second--throughput 100

sh bin/kafka-producer-perf-test.sh –topic test_create_topic4 –num-records 100 –throughput 100000 –producer-props bootstrap.servers=localhost:9092 –record-size 1024

You can go throughLogIKMCheck whether the data size of the partition is increased 从LogIKMYou can see that 1024 messages were sent; And the total data =1M; 1024 pieces *1024byte = 1M;

2. Use the specified message file--payload-file Send 100 messages The maximum throughput is 100 messages per second--throughput 100

  1. Configure the good news file firstbatchmessage.txt

  2. Then the message sent by executing the command will be randomly selected from batchmessage.txt; Note that we do not specify a delimiter with the –payload-delimeter argument. The default delimiter is \n newline;

    bin/kafka-producer-perf-test.sh –topic test_create_topic4 –num-records 100 –throughput 100 –producer-props bootstrap.servers=localhost:9090 –payload-file config/batchmessage.txt

  3. Verify the message, which can be viewed through LogIKM


This parameter is optional

parameter describe example
--topic Specifies the topic to consume
--num-records How many messages to send
--throughput Maximum message throughput per second
--producer-props Producer configuration, k1=v1,k2=v2 --producer-props bootstrap.servers= localhost:9092,client.id=test_client
--producer.config Producer profile --producer.config config/producer.propeties
--print-metrics Print monitoring information at the end of test, default false --print-metrics true
--transactional-id The transaction ID is required to test the performance of concurrent transactions. It takes effect only when –transaction-duration-ms > 0. The default value is performance-producer-default-transactional- ID
--transaction-duration-ms Specifies the maximum duration of the transaction before commitTransaction is called to commit the transaction. The transaction is enabled only if a value > 0 is specified. The default value is 0
--record-size The size of a message byte; One must be specified, but not both
--payload-file Specifies the source file of the message. Only UTF-8 encoded text files are supported. The message separator of the file passes through--payload-delimeter –record-size –record-size –record-size –record-size –record-size –record-size If the message is provided
--payload-delimeter If through--payload-fileThe default value is \n, meaning that each line in the file is treated as a message. If not specified--payload-fileThis parameter does not take effect. When sending a message, it is randomly sent to select the message in the file to send;

10. Consumer stress test kafka-consumer-perf-test.sh

Consume 100 messages --messages 100

sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 –bootstrap-server localhost:9090 –messages 100


This parameter is optional

parameter describe example
--bootstrap-server
--consumer.config Consumer profile
--date-format The results print out the time format The default value is YYYY-MM-dd HH: MM :ss:SSS
--fetch-size The size of a single request for data The default is 1048576
--topic Specifies the topic to consume
--from-latest
--group Consumer group ID
--hide-header If set, no header information is printed
--messages The amount that needs to be consumed
--num-fetch-threads Feth Number of threads for data Default: 1.
--print-metrics Print the monitoring data when you’re done
--show-detailed-stats
--threads Number of consuming threads; The default 10

11. Delete the message kafka-delete-records.sh for the specified partition

Delete Messages for a partition of a specified topic. Delete messages at offset 1024

Configure the JSON file offset-json-file.json first

{"partitions":
[{"topic": "test1"."partition": 0."offset": 1024}]."version":1
}
Copy the code

On command

Sh –bootstrap-server 172.23.250.249:9090 –offset-json-file config/offset-json-file.json

Verify that the sent message is viewed through LogIKM

You can see here, configuration"offset": 1024Delete the message from the beginning to 1024 offset; You delete it from the front

12. View Broker disk information

Example Query information about a specific topic disk --topic-list topic1,topic2

sh bin/kafka-log-dirs.sh –bootstrap-server xxxx:9090 –describe –topic-list test2

Example Query information about a specified Broker disk--broker-list 0 broker1,broker2

sh bin/kafka-log-dirs.sh –bootstrap-server xxxxx:9090 –describe –topic-list test2 –broker-list 0

For example, I have a Topic with 3 partitions and 3 replicas to find out the information log.dir configured on the logDir Broker

{
	"version": 1."brokers": [{
		"broker": 0."logDirs": [{
			"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0"."error": null."partitions": [{
				"partition": "test2-1"."size": 0."offsetLag": 0."isFuture": false
			}, {
				"partition": "test2-0"."size": 0."offsetLag": 0."isFuture": false
			}, {
				"partition": "test2-2"."size": 0."offsetLag": 0."isFuture": false}}]]}, {"broker": 1."logDirs": [{
			"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1"."error": null."partitions": [{
				"partition": "test2-1"."size": 0."offsetLag": 0."isFuture": false
			}, {
				"partition": "test2-0"."size": 0."offsetLag": 0."isFuture": false
			}, {
				"partition": "test2-2"."size": 0."offsetLag": 0."isFuture": false}}]]}, {"broker": 2."logDirs": [{
			"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2"."error": null."partitions": [{
				"partition": "test2-1"."size": 0."offsetLag": 0."isFuture": false
			}, {
				"partition": "test2-0"."size": 0."offsetLag": 0."isFuture": false
			}, {
				"partition": "test2-2"."size": 0."offsetLag": 0."isFuture": false}}]]}, {"broker": 3."logDirs": [{
			"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3"."error": null."partitions": []}]}Copy the code

If you find it difficult to query disk information by command, you can also passLogIKMTo view

12. Consumer group management kafka-consumer-groups.sh

1. Look at the list of consumers--list

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list

A MetadataRequest is called to get a list of all online brokers and then a ListGroupsRequest is sent to each Broker requesting consumer group data

2. View details about the consumer group--describe

DescribeGroupsRequest

View consumer group details--group--all-groups

–group sh bin/kafka-consumer-groups.sh –bootstrap-server XXXXX :9090 –describe –group test2_consumer_group


–all-groups sh bin/kafka-consumer-groups.sh –bootstrap-server XXXXX :9090 –describe –all-groups All the topics consumed, and where the partitions are located, the latest consumption offset, Log the latest data offset, the amount of Lag not yet consumed, the consumer ID, and so on

Example Query information about a consumer member--members

Sh bin/kafka-consumer-groups.sh –describe –all-groups –members –bootstrap-server XXX :9090 bin/kafka-consumer-groups.sh –describe –members –group test2_consumer_group –bootstrap-server xxxx:9090

Example Query consumer status information--state

Status information about all consumer groups sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090 Specifies the status of a consumer group sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090

3. Delete the consumer group--delete

DeleteGroupsRequest

Delete a consumer group –delete

Sh –delete –group test2_consumer_group –bootstrap-server XXXX :9090 –all-groups sh bin/kafka-consumer-groups.sh –delete –all-groups –bootstrap-server XXXX :9090

PS: If you want to delete a consumer group, all clients of the consumer group have stopped consuming/offline. Otherwise, the following exception is reported

Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

Copy the code

4. Reset the offset of the consumer group--reset-offsets

A prerequisite for successful execution is that the consumer group will be unavailable;

The following example uses the following parameters: –dry-run; This parameter represents pre-execution, which prints out the results to be processed; Use the excute parameter when you want to actually execute it;

The following examples of reset modes are all to-earliest reset;

Please refer to the following related reset Offset mode to another mode as required;

Resets the offset of the specified consumer group--group

–all-topic sh bin/kafka-consumer-groups.sh –reset-offsets — to-groups –group Test2_consumer_group –bootstrap-server XXXX :9090 –dry-run –all-topic Resets the offset of the specified topic for the specified consumer group –topic sh bin/kafka-consumer-groups.sh –reset-offsets –to-earliest –group test2_consumer_group –bootstrap-server xxxx:9090 –dry-run –topic test2

Resets offsets for all consumer groups--all-group

–all Topic sh bin/kafka-consumer-groups.sh –reset-offsets — to-groups –all-group –bootstrap-server XXXX :9090 –dry-run –all-topic Resets the offset for the specified topic in all consumer groups –topic sh bin/kafka-consumer-groups.sh –reset-offsets –to-earliest –all-group –bootstrap-server xxxx:9090 –dry-run –topic test2

–reset-offsets need to be followed by the reset mode

Related reset Offset mode

parameter describe example
--to-earliest : Reset the offset to the original offset(find the earliest offset that has not been deleted)
--to-current: Reset offset directly to the current offset, which is LOE
--to-latest: Reset to the last offset
--to-datetime: Reset to offset at the specified time; Format for:YYYY-MM-DDTHH:mm:SS.sss; - to - a datetime "2021-6-26 T00:00:00. 000"
--to-offset Reset to the specified offset, but more often than not, if multiple partitions are matched, here reset all partitions that are matched to this value; If 1. Target maximum offset<--to-offset, reset to the target maximum offset; 2. Target minimum offset>--to-offset, reset to minimum; 3. Otherwise, it will reset to--to-offsetThe target value of;You don’t usually use this --to-offset 3465
--shift-by How many offsets to increase or decrease in terms of the offset; Positive is increased forward; Negative goes backwards; And of course it matches all of them; --shift-by 100--shift-by -100
--from-file Reset according to the CVS document; I’ll talk about it separately here

--from-fileLet me focus on that

The other patterns above reset all matched partitions; Cannot reset to a different offset for each partition; **–from-file** allows us to be a little more flexible;

  1. Configure the CVS document first

The format is: Topic: partition number: reset target offset CVS test2,0,100 test2,1,200 test2,2,300 2. Run the >sh bin/kafka-consumer-groups.sh –reset-offsets –group test2_consumer_group –bootstrap-server XXXX :9090 command –dry-run –from-file config/reset-offset.csv

5. Delete the offsetdelete-offsets

A prerequisite for successful execution is that the consumer group will be unavailable;

After the offset is deleted, the Consumer Group will consume from the beginning the next time it starts.

sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2


This parameter is optional

parameter describe example
--bootstrap-server Specifies the Kafka service to connect to. –bootstrap-server localhost:9092
--list List all consumer group names --list
--describe Example Query consumer description --describe
--group Specify a consumer Group
--all-groups Specify all consumer groups
--members Example Query information about members of a consumer group
--state Example Query the status of a consumer
--offsets When querying the description of a consumer group, this parameter lists the offset information of the message. This is the default;
dry-run When resetting the offset, using this parameter allows you to see the reset in advance, before the actual execution takes place--excute; The default isdry-run
--excute Actually perform the reset offset operation;
--to-earliest Reset offset to the earliest possible date
to-latest Reset offset to the nearest

The attachment

Some optional configurations for ConfigCommand


Topic Optional configuration

key value The sample
cleanup.policy Clean up the strategy
compression.type Compression types (usually recommended on the Produce side)
delete.retention.ms Retention period of compressed logs
file.delete.delay.ms
flush.messages Persist message limits
flush.ms Persistence frequency
follower.replication.throttled.replicas Flowwer Copy flow limiting format: Partition NUMBER: copy follower NUMBER, partition number: copy follower number 1-0, 1:1
index.interval.bytes
leader.replication.throttled.replicas Leader replica traffic limiting format: Partition number: Number of the replica leader 0-0 draw
max.compaction.lag.ms
max.message.bytes Maximum batch message size
message.downconversion.enable Message is backward compatible
message.format.version Message Format version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas The smallest ISR
preallocate
retention.bytes Log retention size (usually by time limit)
retention.ms Log Retention time
segment.bytes Size limit of segment
segment.index.bytes
segment.jitter.ms
segment.ms Segment cutting time
unclean.leader.election.enable Whether an asynchronous copy can be selected as the primary

Broker Related Optional configuration

key value The sample
advertised.listeners
background.threads
compression.type
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.message.downconversion.enable
log.message.timestamp.difference.max.ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
replica.alter.log.dirs.io.max.bytes.per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
unclean.leader.election.enable

Users Optional configuration

key value The sample
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate Traffic limiting is implemented for consumer users
producer_byte_rate Stream limiting for producers
request_percentage Percentage of requests

Clients Optional configuration

key value The sample
consumer_byte_rate
producer_byte_rate
request_percentage

Most of the above operations can be performedLogI-Kafka-ManagerVisually operate on the platform;







Didi Open source Logi-Kafkamanager one-stop Kafka monitoring and control platform

Welcome to add personal wechat to pull you into the development of technology exchange group, the group dedicated to answer technical questions (please note: technology) WX: JJDlmn_ or WX: mike_zhangliang