Kafka basic overview

What is a Kafka

Kafka is a distributed streaming platform with three key capabilities

  1. Subscribe to a publication record flow similar to that in an enterpriseThe message queueEnterprise messaging systems
  2. Store record streams in a fault-tolerant manner
  3. Real-time recording stream

The application of Kafka

  1. As a messaging system
  2. As a storage system
  3. As a stream processor

Kafka can build streaming data pipes that reliably fetch data between systems or applications.

Build streaming application transmission and response data.

Kafka as a messaging system

As a messaging system, Kafka has three basic components

  • Producer: indicates the client that publishes messages
  • Broker: A client that receives and stores messages from producers
  • Consumer: Consumers read messages from the Broker

In large systems, where there are many subsystems to interact with and messaging is required, you will find both the source system (the sender of the message) and the destination system (the receiver of the message). In order to transfer data in such a messaging system, you need to have the right data pipes

This kind of data interaction can seem confusing, and if we use a messaging system, the system becomes simpler and cleaner

  • Kafka runs as a cluster on servers in one or more data centers
  • The directory where a Kafka cluster stores message records is calledtopics
  • Each message record contains three elements: key, value and Timestamp.

Core API

Kafka has four core apis

  • The Producer API, which allows an application to send message records to one or more topics
  • The Consumer API, which allows an application to subscribe to one or more topics and process the record stream generated for it
  • The Streams API, which allows an application to act as a stream processor, consuming input Streams from one or more topics and generating output Streams for them, effectively transforming input Streams into output Streams.
  • Connector API, which allows you to build and run available producers and consumers that connect Kafka topics to existing applications or data systems. For example, a connector for a relational database might capture all changes to a table

Basic Kafka concepts

As a highly scalable and fault-tolerant messaging system, Kafka has many basic concepts. Let’s take a look at some of these concepts that are unique to Kafka

topic

A Topic is called a Topic, and in Kafka, a class attribute is used to classify the message into a class called Topic. Topic is a logical concept equivalent to the allocation tag of a message. Topics are like tables in a database, or folders in a file system.

partition

A topic is divided into one or more partitions. It is a physical concept that corresponds to one or more directories on the system. A partition is a commit log. Messages are appended to the partition and read sequentially.

Note: Since a topic contains an infinite number of partitions, there is no guarantee of order across the entire topic, but a single Partition Partition can guarantee order. Messages are forced to be written to the end of each partition. Kafka implements data redundancy and scalability through partitioning

Partitions can be distributed on different servers, that is, a topic can span multiple servers to provide greater performance than a single server.

segment

Segment is translated as a Segment, which further subdivides a Partition into several segments. Each Segment file has the same size.

broker

A Kafka cluster consists of one or more servers, each of which is called a broker. The broker receives messages from the producer, sets offsets for the message, and commits the message to disk for saving. The broker serves consumers and responds to requests to read partitions by returning messages that have been committed to disk.

A broker is part of a cluster. In each cluster, one broker acts as a Leader and is elected by the active members of the cluster. A member of each cluster may act as a Leader, who is responsible for managing the work, including assigning partitions to and monitoring brokers. Partition replication occurs in a cluster where a partition is subordinate to a Leader, but a partition can be assigned to multiple brokers (non-leaders). This replication mechanism provides message redundancy for partitions, and if a broker fails, other active users re-elect a Leader to take over.

producer

Producers, or message publishers, publish messages from a topic to the appropriate partition. By default, producers distribute messages evenly across all partitions of a topic, regardless of which partition a particular message is written to. However, in some cases, the producer writes the message directly to the specified partition.

consumer

A consumer is a message user. A consumer can consume messages of multiple topics. For a message of a topic, it only consumes messages in the same partition

After understanding the basic concept of Kafka, we build Kafka cluster to further understand Kafka.

Ensure the installation environment

Installing the Java Environment

Use the Java -version command to check the Java version. Jdk 1.8 is recommended. If the Java environment is not installed, You can follow this article to install (www.cnblogs.com/zs-notes/p/…

Install the Zookeeper environment

Kafka uses Zookeeper to store metadata to ensure consistency, so you need to install Zookeeper before installing Kafka. Kafka distributions ship with Zookeeper, and you can use scripts to start Kafka, but installing a Zookeeper doesn’t require any effort

Set up a single Zookeeper server

Zookeeper single building is simpler, directly from www.apache.org/dyn/closer…. You can download a stable version of Zookeeper from the official website. In this example, I used 3.4.10. After downloading, create a Zookeeper folder in the /usr/local directory on Linux. XFTP tool (XFTP and xshell tools can apply for free home edition) in website www.netsarang.com/zh/xshell/ to download a good zookeeper package into/usr/local/zookeeper directory.

Gz package, run the tar -zxvf zookeeper-3.4.10.tar.gz command to decompress the package

If not, use yum install unzip to install the zip decompression tool and unzip zookeeper-3.4.10.zip.

After completion of decompression, CD to/usr/local/zookeeper/zookeeper – 3.4.10, create a data folder, and then into the conf folder, use the mv zoo_sample. CFG zoo. CFG rename operation

Then use the vi open zoo. CFG, change my dataDir = / usr/local/zookeeper/zookeeper – 3.4.10 / data, save.

Go to the bin directory and run the command to start the service ./zkServer.sh startIf the following output is displayed, the establishment is successful

To stop the service, run the./ zkserver. sh stop command

Run the./zkServer.sh status command to view the status information.

The Zookeeper cluster is created

Preparation conditions

Preparation conditions: I used CentOS7, installed three VMS, allocated 1GB of memory for each VM, created a zooKeeper folder under /usr/local/, moved the ZooKeeper zip package, unpacked it. The zooKeeper-3.4.10 folder will be created. Go to the folder and create two new folders: data and log

Note: A data folder has been created in the previous single machine setup, so there is no need to create a new log folder. For the other two newly added services, these two folders need to be created.

Set up the cluster

After creating the zoo. CFG file, edit the conf/zoo. CFG file

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/ usr/local/they/they are - 3.4.10 / data
dataLogDir=/ usr/local/zookeeper/zookeeper - 3.4.10 / log
clientPort=12181
server.1=192.168.1.7:12888:13888
server.2=192.168.1.8:12888:13888
server.3=192.168.1.9:12888:13888
Copy the code

The 1 in server.1 indicates the identity of the server or any other number that indicates the number of the server. This identity must be the same as the identity of myID configured below.

192.168.1.7:12888:13888 is the IP address in the cluster. The first port is the communication interface between the master and slave. The default port is 2888. Port used to elect a new leader when the cluster starts or when the leader fails. The default port is 3888

Now explain the configuration file above

TickTime: Indicates the heartbeat interval between Zookeeper servers or between clients and servers. That is, each tickTime sends a heartbeat.

InitLimit: This configuration item is used to configure the Zookeeper accept client (not the client through which the user connects to the Zookeeper server, It is the maximum heartbeat interval that the Follower server connected to the Leader in the Zookeeper cluster can endure during initial connection. If the Zookeeper server does not receive any message from the client after the tickTime period exceeds five heartbeats, the connection to the client fails. The total length of time is 5 times 2000 is 10 seconds

SyncLimit: This configuration item identifies the request and reply duration between the Leader and Follower. The maximum duration is tickTime. The total duration is 5 x 2000=10 seconds

DataDir: indicates the directory where snapshot logs are stored

DataLogDir: specifies the directory where transaction logs are stored. If this parameter is not specified, transaction logs are stored in the directory specified by dataDir by default. As a result, zK performance is severely affected

ClientPort: This port is used by the client to connect to the Zookeeper server. Zookeeper listens to this port and receives access requests from the client.

Create the myID file

Now that we know about its configuration file, we will create the myID of each cluster node. As mentioned above, this myID is the 1 of server.1. Similarly, we need to specify an identity for each service in the cluster, using the echo command

# server.1
echo "1" > / usr/local/they/they are - 3.4.10 / data/myid
# server.2
echo "2" > / usr/local/they/they are - 3.4.10 / data/myid
# server.3
echo "3" > / usr/local/they/they are - 3.4.10 / data/myid
Copy the code

Start the service and test

The configuration is complete, started and tested for each ZK service, and my test results on a Windows computer are as follows

Start services (each required)

cd / usr/local/they/they are - 3.4.10 / bin
./zkServer.sh start
Copy the code

Checking service Status

Run the./ zkserver. sh status command to check the service status

192.168.1.7 – followers

192.168.1.8 – leader

192.168.1.9 – followers

A ZK cluster usually has only one leader and multiple followers. The master usually receives read and write requests from the corresponding client, while the data is synchronized from the master. When the master dies, a leader is elected from the followers.

Kafka cluster setup

Preparation conditions

  • Created Zookeeper cluster
  • Kafka package (www.apache.org/dyn/closer….

Create a kafka folder in /usr/local/kafka, move the tar.gz package to /usr/local/kafka, and decompress it using the tar -zxvf package. Go to kafka_2.12-2.3.0, create a log folder and go to the config directory

We can see that there are many properties configuration files, but we will focus on the server.properties file.

Kafka startup mode has two kinds, one kind is to use kafka’s own zookeeper configuration file to start (can be carried out in accordance with the website to start, and use more than a single service node to simulate the cluster kafka.apache.org/quickstart#… Zk cluster to start

Modifying configuration Items

The configuration item that needs to be modified for each service, namely server.properties, needs to be updated and added

broker.id=0 // The initial value is 0. Each server's broker.id should be set differently, just as myID is set to 1,2, and 3 for my three services
log.dirs=/ usr/local/kafka/kafka_2. 12-2.3.0 / log
Log.retention. Hours =168
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
Set the zooKeeper connection port
zookeeper.connect=192.168.1.7:2181192168 1.8:2181192168 1.9:2181
Copy the code

Meaning of the configuration item

broker.id=0 # Uniquely identifies the current machine in the cluster, similar to the MyID property of ZooKeeper
port=The default kafka port for external services is 9092
host.name=192.168.1.7 # This parameter is off by default, there is a bug in 0.8.1, DNS resolution issues, failure rate issues.
num.network.threads=3 # This is the number of borker threads for network processing
num.io.threads=8 # This is the number of borker I/O processing threads
log.dirs=/usr/local/kafka/ kafka_2.12-3.0 /log Threads > num.io. Threads > num.io. Threads > num.io. Threads > num.io. Threads > num.io. Threads > Num.io
socket.send.buffer.bytes=102400 # Send buffer size. Data is not sent all at once. It is stored in the buffer before sending
socket.receive.buffer.bytes=Kafka receives buffer size and serializes data to disk when it reaches a certain size
socket.request.max.bytes=104857600 # This parameter is the maximum number of requests to send a message to Kafka or to send a message to Kafka. This value cannot exceed the Java stack size
num.partitions=1 # The default number of partitions for a topic
log.retention.hours=168 # Default maximum persistence time for messages, 168 hours, 7 days
message.max.byte=5242880 # The maximum number of messages saved is 5M
default.replication.factor=2 # Number of copies of messages that Kafka keeps, and if one copy fails, the other can continue to serve
replica.fetch.max.bytes=5242880 # Fetch the maximum direct number of messages
log.segment.bytes=1073741824 # This parameter is: Because Kafka messages are appended to files, kafka starts a new file when this value is exceeded
log.retention.check.interval.ms=Check log expiration time (log.retention. Hours =168) every 300000 ms and check directory to see if there are expired messages. If so, delete them
log.cleaner.enable=False # Whether to enable log compression. This function improves performance
zookeeper.connect=192.168.1.7:2181192168 1.8:2181192168 1.9:2181 # set the zookeeper end connections
Copy the code

Start the Kafka cluster and test it

  • Start the service and enter/ usr/local/kafka/kafka_2 12-2.3.0 / bindirectory
Start the background process
./kafka-server-start.sh -daemon .. /config/server.properties
Copy the code
  • Check whether the service is started
Run the JPS command
6201 QuorumPeerMain
7035 Jps
6972 Kafka
Copy the code
  • Kafka is up and running
  • Create a Topic to verify that it was created successfully
# cd .. Go back one layer to the /usr/local/kafka/kafka_2.12-2.3.0 directory
bin/kafka-topics.sh --create -- Zookeeper 192.168.1.7:2181 --replication-factor 2 -- Partitions 1 --topic cxuan
Copy the code

To the above explanation

–replication-factor 2 Replicates two copies

— Partitions 1 create a partition

–topic creates the topic

Check to see if our theme was created successfully

bin/kafka-topics.sh - the list - the zookeeper 192.168.1.7:2181
Copy the code

Start a service to get the cluster started

Create a publisher on a machine

Create a broker
./kafka-console-producer.sh - broker - list 192.168.1.7:9092 - topic cxuantopic
Copy the code

Create a subscriber on a server

Create a consumer
bin/kafka-console-consumer.sh -- Bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from -- beginning
Copy the code

Note: Zookeeper is not a recognized option because the kafka version is too high and requires the –bootstrap-server command

The test results

release

consumption

Other commands

According to the topic

bin/kafka-topics.sh - the list - the zookeeper 192.168.1.7:2181
# show
cxuantopic
Copy the code

Viewing topic Status

bin/kafka-topics.sh Describe -- Zookeeper 192.168.1.7:2181 --topic cxuantopic
# Below is the detailed information displayed
Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:
Topic: Cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 2
Cxuantopic has a partition of 0 with a replication factor of 2
# Replicas: 0,1, 2
Copy the code

The Leader is responsible for all read and write nodes for a given partition, and each node becomes the Leader by random selection.

Replicas is a list of nodes that replicate logs for that partition, whether they are the Leader or currently active.

An Isr is a collection of synchronized replicas. It is a subset of the replica list and is currently active and follows the Leader.

At this point, the Kafka cluster is built.

Verify that multiple nodes receive data

We all used the same IP service just now. Let’s use nodes in other clusters to verify whether the service can be received

Use it on the other two nodes

bin/kafka-console-consumer.sh -- Bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from -- beginning
Copy the code

The broker is then used to send messages, which can be received by all three nodes.

The configuration,

Server.properties (Kafka, Kafka, Kafka, Kafka, Kafka, Kafka, Kafka

General configuration

These parameters are the most basic configuration in Kafka

  • broker.id

Each broker needs to have an identifier, represented by broker.id. Its default value is 0, and it can be set to any other integer, ensuring that the broker.ID of each node in the cluster is unique.

  • port

If you start Kafka with a configuration sample, it listens on port 9092, and you can change the port configuration parameter to set it to any other available port.

  • zookeeper.connect

The address used to store broker metadata is specified through zookeeper.connect. Localhost :2181: runs on local port 2181. The configuration parameters are a comma-separated set of hostname:port/path lists with the following meanings for each part:

Hostname is the service name or IP address of the ZooKeeper server

Port is the port connected to ZooKeeper

/path is an optional ZooKeeper path that serves as the chroot environment of the Kafka cluster. If not specified, the following path is used by default

  • log.dirs

Kafka stores messages on disk, and the directory in which these log fragments are stored is specified through log.dirs. It is a comma-separated set of local file system paths. If multiple paths are specified, the broker stores log fragments from the same partition to the same path according to the “least used” rule. Note that the broker adds partitions to the path with the least number of partitions, not to the path with the least disk space.

  • num.recovery.threads.per.data.dir

Kafka uses a configurable thread pool to process log fragments in three cases

The server starts normally and is used to open log fragments for each partition;

The server is started after a crash to check and truncate log fragments for each partition;

The server shuts down properly to close log fragments

By default, only one thread is used per log directory. Since these threads are only used during server startup and shutdown, it is possible to set up a large number of threads for the purpose of well operation. Especially for servers with a large number of partitions, the use of a well row operation can save hours of recovery time in the event of a crash. When setting this parameter, note that the specified number corresponds to the single log directory specified by log.dirs. . That is to say, if num. Recovery. Threads per. The data. The dir is set to 8, and the dir specifies three paths, so it will be altogether 24 threads.

  • auto.create.topics.enable

By default, Kafka creates themes in one of three situations

When a producer starts writing messages to a topic

When a consumer starts reading messages from a topic

When any customer sends a metadata request to a topic

  • delete.topic.enable

If you want to delete a theme, you can use the theme Management tool. By default, topics are not allowed to be deleted. The default value of delete.topic.enable is false so you cannot delete topics at will. This is reasonable protection for production environments, but in development and test environments, you can delete topics, so if you want to delete topics, you need to set delete.topic.enable to true.

Theme Default Configuration

Kafka provides a number of default configuration parameters for newly created themes. Let’s take a look at these parameters

  • num.partitions

The num. Partitions parameter specifies how many partitions the newly created subject needs to contain. If automatic theme creation is enabled (which is enabled by default), the number of theme partitions is the value specified by this parameter. The default value of this parameter is 1. Note that we can increase the number of topic partitions, but not decrease them.

  • default.replication.factor

This parameter is simpler, it says kafka save the replicas, if a copy of the failure, another also can continue to provide service. The default replication. The factor of the default value is 1, this parameter is enabled in your theme after automatically create function is effective.

  • log.retention.ms

Kafka generally determines how long data can be retained based on time. The log.retention. Hours parameter is used to configure the time by default, which is 168 hours, or a week. In addition, there are two parameters log.retention. Minutes and log.retention.ms. These three parameters have the same function and all determine how long it takes for messages to be deleted. Log.retention. Ms is recommended.

  • log.retention.bytes

Another way to retain a message is to determine whether it is out of date. Its value, specified with the log.retention. Bytes parameter, applies to each partition. That is, if you have a topic with eight partitions and log.retention. Bytes is set to 1GB, the topic can hold up to 8GB of data. So, as the number of partitions for a topic increases, so does the amount of data that can be retained for the entire topic.

  • log.segment.bytes

The above logs work on log fragments, not individual messages. When messages reach the broker, they are appended to the current log fragment of the partition. When the log fragment size reaches the upper limit specified in log.segment.bytes (default: 1GB), the current log fragment is closed and a new log fragment is opened. If a log fragment is closed, it begins to wait for expiration. The smaller the value of this parameter is, the more frequently new files are closed and allocated, reducing the overall efficiency of disk writes.

  • log.segment.ms

The log.segment.ms parameter specifies how long the log is closed, and log.segment.ms and log.retention. Log fragments are closed when the size or time limit is reached, depending on which condition is satisfied first.

  • message.max.bytes

The broker sets the message.max.bytes parameter to limit the size of a single message. The default value is 1000, 000, or 1MB. As with other byte-related configuration parameters, this parameter refers to the compressed message size, meaning that the actual size of the message can be larger than this value as long as the compressed message is smaller than mesage.max.bytes

This value has a significant impact on performance. The larger the value, the more time it takes for the thread handling network connections and requests to process those requests. It also increases the disk write block size, which affects I/O throughput.

If the readers of this article feel good, kneel for praise, message, your support will be my motivation to continue to liver article!

In addition, add my becomecXuan on wechat to join the One question of the day group, one interview question of the day to share, for more content, please see my Github,Be the bestJavaer

I have uploaded six PDFS by myself. After cXuan, the programmer, followed the official account on wechat, he replied to CXuan in the background and got all THE PDFS. These PDFS are as follows

Six PDF links