Kafka Cluster Deployment (Kafka +Raft)

“This is the 16th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021.”

Kafka + zk mode

1. Plan the cluster

The host name The IP address node.id process.roles
kafka1 192.168.56.107 1 broker,controller
kafka2 192.168.56.108 2 broker,controller
kafka3 192.168.56.109 3 broker,controller

2. Raft configuration files

vi config/kraft/server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in KRaft mode, where
# Apache ZooKeeper is not present. See config/kraft/README.md for details.
#

############################# Server Basics #############################

# The role of this server. Setting this puts us in KRaft mode
# Identifies the role that this node plays. This value is required in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
The ID of the node, which is associated with the role that the node plays, is different for each server
node.id=1

# The connect string for the controller quorum
The cluster address string for the controller quorum connection. Much the same as configuring zK connections, but in a different format for each serverController. The quorum. Voters = @192.168.56.107:1 9093, 2 @ 192.168.56.108:9093, 3 @ 192.168.56.109:9093############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
## Local IP + port is different for each serverListeners = PLAINTEXT: / / 192.168.56.107:9092, CONTROLLER: / / 192.168.56.107:9093 Intel. Broker. The listener. The name = PLAINTEXT# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
## Local IP + port is different for each serverAdvertised. Listeners = PLAINTEXT: / / 192.168.56.107:9092# Listener, host name, and port for the controller to advertise to the brokers. If
# this server is a controller, this listener must be configured.
controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more detailslistener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:S ASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
Data log directory
log.dirs=/tmp/kraft-combined-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
Copy the code

process.roles :

  • If process.roles = broker, the server acts as the broker in the KRaft pattern.
  • If process.roles = Controller, the server acts as controller in KRaft mode.
  • If Process. roles = Broker, Controller, the server also acts as a Broker Controller in The KRaft pattern.
  • If Process.roles is not set. The cluster is assumed to be running in ZooKeeper mode.

A node that acts as both Broker and Controller is called a “composite” node.

Controller. The quorum. Voters:

  • The Controller.quorum. Voters configuration must be set for all nodes in the system.
  • This configuration identifies which nodes are Quorum’s voter nodes. All nodes that want to be controllers need to be included in this configuration. This is similar to when using ZooKeeper, all ZooKeeper servers must be included in the zooKeeper.connect configuration.
  • However, unlike the ZooKeeper configuration, the Controller.quorum. Voters configuration needs to include the ID of each node. The format is id1@host1:port1,id2@host2:port2.

3. Generate a unique cluster ID

In versions 1.0 and 2.0, the cluster ID is automatically generated, and the directory where the data is stored is automatically generated. So why do we do this in 3.0?

The thinking in the community was that automatic formatting would sometimes mask exceptions, for example, in Unix if a data directory could not be mounted, it might be blank, in which case automatic formatting would be problematic. This feature is particularly important for the Controller server to maintain metadata logs, because if two of the three Controller nodes can start with a blank log, it is possible to select a Leader without any content in the log, This causes all metadata to be lost (truncated after KRaft arbitration). Once this problem occurs, it is an irreversible failure.

Start by using the kafka-storage.sh tool in the bin directory to generate a unique ID for your new cluster

13-3.0.0] [root @ localhost kafka_2.# bin/kafka-storage.sh random-uuid
PYfQjCKRQZWpOAa_SkxHNA
Copy the code

4. Format the directory for storing data

Next comes formatting the storage directory. If you are running in single-node mode, you will need to execute the following command on your machine. If you have multiple nodes, you should run the format command separately on each node to format the Make sure you use the same cluster ID for each cluster.

13-3.0.0] [root @ localhost kafka_2.# bin/kafka-storage.sh format -t PYfQjCKRQZWpOAa_SkxHNA -c ./config/kraft/server.properties
Formatting /tmp/kraft-combined-logs
Copy the code

Note: You cannot currently convert back and forth between ZooKeeper mode and KRaft mode without reformatting the directory.

Properties file contents

13-3.0.0] [root @ localhost kafka_2.# cat /tmp/kraft-combined-logs/meta.properties 
#
#Wed Oct 20 09:33:33 UTC 2021Cluster. id=PYfQjCKRQZWpOAa_SkxHNA version=1 node.id=2 [root@localhost kafka_2.13-3.0.0]# 
Copy the code

5. Start

Finally, you are ready to start the Kafka server on each node.

bin/kafka-server-start.sh ./config/kraft/server.properties 
Copy the code

Note: Just like zooKeeper-based clusters, the Kraft pattern can connect to port 9092(or any port configured) to perform related operations, such as creating and deleting topics

6. Shut down the cluster

Disable Kafka on kafka1, kafka2, and kafka3 in sequence

bin/kafka-server-stop.sh stop
Copy the code

Viewing log Files

In KRaft mode, all the data stored on Zookeeper is transferred to an internal Topic: @metadata. Such as Broker information, Topic information, and so on. So we need a tool to view the current data content.

Kafka-dump-log.sh kafka-dump-log. sh is a pre-existing tool for viewing the contents of a Topic file. Cluster-metadata-decoder is used to view metadata logs, as shown below:

parameter describe example
–deep-iteration
–files <String: file1, file2, … > Required; Log file read 0000009000 – files. The log
–key-decoder-class If set, it is used to deserialize keys. This class should implement Kafka. Serializer. Decoder features. Custom jars should be provided in the kafka/libs directory
–max-message-size Maximum amount of data. Default: 5242880
–offsets-decoder if set, log data will be parsed as offset data from the__consumer_offsets topic
–print-data-log Print the content
–transaction-log-decoder if set, log data will be parsed as transaction metadata from the__transaction_state topic
–value-decoder-class [String] if set, used to deserialize the messages. This class should implement kafka. serializer.Decoder trait. Custom jar should be available in kafka/libs directory. (default: kafka.serializer. StringDecoder)
–verify-index-only if set, just verify the index log without printing its content.

Querying Log Files

bin/kafka-dump-log.sh --files /tmp/kraft-combined-logs/first-kraft-0/00000000000000000000.log	
Copy the code

Example Query details about Log files

bin/kafka-dump-log.sh --files /tmp/kraft-combined-logs/first-kraft-0/00000000000000000000.log --print-data-log
Copy the code

Example Query information about the index file

bin/kafka-dump-log.sh --files /tmp/kraft-combined-logs/first-kraft-0/00000000000000000000.index
Copy the code

Configuration items forlog.index.size.max.bytes; To control the size of the index created;

Query the timeindex file

bin/kafka-dump-log.sh --files /tmp/kraft-combined-logs/first-kraft-0/00000000000000000000.timeindex
Copy the code

Viewing metadata

bin/kafka-metadata-shell.sh  --snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log
Copy the code

Kafka Raft metadata schema

Apache Kafka is not dependent on Apache Zookeeper. It is called Kafka Raft Metadata mode (KRaft mode for short) by the community. This mode has been released as a play version in 2.8. You can get an initial taste of KRaft, but it is not recommended for use in production environments. There will be a stable 3.0 release in the future.

Kafka clusters run in KRaft mode do not store metadata in Apache ZooKeeper. When deploying a new cluster, there is no need to deploy the ZooKeeper cluster because Kafka stores its metadata in the KRaft Quorum of the Controller node. KRaft can bring many benefits, such as supporting more partitions, switching controllers faster, and avoiding a series of problems caused by inconsistency between Controller cached metadata and Zookeeper stored data.

Note: Kafka officially scrapped Zookeeper and embraced Raft in version 2.8

Why kill ZK?

  • Kafka as a message queue relies surprisingly on a heavyweight coordination system called ZooKeeper. Also as a message queue, RabbitMQ is self-managing early on.
  • Zookeeper is cumbersome and requires an odd number of nodes to be configured in a cluster, making it difficult to expand or shrink the cluster capacity. Zk is configured in a completely different way from Kafka. To tune Kafka, you have to take into account another system.
  • In order for Kafka to evolve into lightweight, out-of-the-box Kafka has to kill Zk.
  • Since Zk and Kafka are not, after all, in the same storage system, data synchronization issues become significant as the number of topics and partitions increases in size. Zk is reliable, but it’s slow, and nowhere near as good as in Kafka’s log storage system, which Kafka has to get around because of its reputation for speed.

What will change

Deployment is simpler. First, deployment is much simpler. For less high-availability systems, even a single process can get kafka up and running. We don’t need to apply for ZooKeeper-friendly SSDS anymore, and we don’t need to worry about whether zK has enough capacity.

Monitoring is easier.

Second, because of the centralization of information, it’s easy to get monitoring information from Kafka without having to go all the way to ZK. Integration with grafana/ Kibana/Promethus systems is just around the corner.

Faster.

Of course, the most important thing is speed. Raft is easier to understand and more efficient than ZK’s ZAB protocol, the partition primary election will be faster, and the controller scheduling speed will be improved.

conclusion

Kafka is often considered a heavyweight infrastructure because of the complexity of managing Apache Zookeeper. This typically results in projects starting with a more lightweight message queue, such as ActiveMQ or a traditional message queue like RabbitMQ, and then migrating to Kafka as they grow in size.

That has now changed. The KRaft pattern provides a great, lightweight way to get started with Kafka, or to use it as an alternative to message queues such as ActiveMQ or RabbitMQ. Lightweight single-process deployments are also better suited to edge scenarios and those that use lightweight hardware.

Kafka series

  • Kafka (1) : Kafka cluster deployment (Kafka + ZK)
  • The article continues to be updated…