Kafka characteristic

Kafka was originally developed by LinkedIn as a distributed publish/subscribe messaging system, and has since become a top-level project for Apache. The main features are as follows:

1. High throughput for both publish and subscribe

Kafka is designed to provide message persistence capability in O(1) time complexity, with constant time access performance even over terabytes of data. Even on very cheap commercial machines, it can be done on a single machine to support 100K messages per second.

2. Message persistence

Messages are persisted to disk so they can be used for bulk consumption, such as ETL, as well as real-time applications. Prevent data loss by persisting data to hard disk and replication.

3. The distributed

Supports message partitioning and distributed consumption among servers, and ensures the sequential transmission of messages within each partition. This is easy to scale out, and all producers, brokers, and consumers can be multiple and distributed. The machine can be expanded without stopping.

4. Consume messages in pull mode

The state in which messages are processed is maintained on the consumer side, not the server side. The broker is stateless, and the consumer saves the offset itself.

5. Support online and offline scenarios.

Both offline data processing and real-time data processing are supported.

Basic concepts in Kafka

1. Broker

One or more servers in a Kafka cluster are collectively called brokers

2. Topic

Every message posted to Kafka has a category called Topic. Physically, messages for different topics are stored separately. Logically, messages for a Topic are stored on one or more brokers, but users can produce or consume data by specifying the Topic of the message, regardless of where the data is stored.)

3. Partition

Topic is a physical grouping. A Topic can be divided into multiple partitions, each of which is an ordered queue. Each message in a Partition is assigned an ordered ID (offset).

4. Producer

Producers of messages and data can be understood as clients that send messages to Kafka

5. Consumer

Consumers of messages and data can be understood as clients fetching messages from Kafka

6. Consumer Group

Each Consumer belongs to a specific Consumer Group (you can specify a Group Name for each Consumer, or the default Group if you do not specify a Group Name). This is how Kafka allows a Topic message to be broadcast (to all consumers) and unicast (to any Consumer). A Topic can have multiple Consumer groups. Topic messages are copied (not really copied, but conceptually) to all Consumer groups, but each Consumer Group sends messages to only one Consumer in that Consumer Group. If broadcasting is to be implemented, each Consumer should have a separate Consumer Group. If you want to implement unicast as long as all consumers in the same Consumer Group. Consumer groups also allow consumers to Group freely without having to send messages multiple times to different topics.

Kafka installation

Mac users use HomeBrew to install and update brew before installing

brew update
Copy the code

Next, install Kafka

brew install kafka
Copy the code

After the installation is complete, you can view the Kafka configuration file

cd /usr/local/etc/kafka
Copy the code

Zookeeper is required for Kafka. HomeBrew installs ZooKeeper with Kafka. Start ZooKeeper:

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
Copy the code

Then start Kafka

cd /usr/local/ Cellar/kafka / 0.11.0.1. / bin/kafka - server - start/usr /local/etc/kafka/server.properties
Copy the code

Create a topic and set the number of partitions to 2. The name of the topic is test-topic. The following examples will use this topic

cd /usr/local/Cellar/kafka/0.11.0.1./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topictest-topic
Copy the code

View the created topics

cd /usr/local/Cellar/kafka/0.11.0.1./bin/kafka-topics --list --zookeeper localhost:2181Copy the code

Kafka command line test

Send a message

cd /usr/local/ Cellar/kafka / 0.11.0.1 / bin kafka - the console - producer - broker - list localhost: 9092 - topictest-topic
Copy the code

News consumption

cd /usr/local/ Cellar/kafka / 0.11.0.1 / bin kafka - the console - consumer - the bootstrap - server localhost: 9092 - topictest-topic --from-beginning
Copy the code

Delete the topic

cd /usr/local/ Cellar/kafka / 0.11.0.1 / bin, / bin/kafka - switchable viewer -- delete - zookeeper localhost: 2181 - topictest-topic
Copy the code

Delete. Topic. enable=true If server.properties is not configured with delete.topic.enable=true in the configuration file loaded when Kafka is started, the deletion is not a true deletion, but marked for deletion

View all topics

cd /usr/local/ Cellar/kafka / 0.11.0.1 / bin, / bin/kafka - switchable viewer - the zookeeper localhost: 2181 - the listCopy the code

Physical Delete Topic

Log in to the ZooKeeper client: /usr/local/ Cellar/zookeeper / 3.4.10 / bin/zkCli find topic directory: ls/delete brokers/switchable viewer find topic, execute the command: RMR/brokers/switchable viewer /testTopic is deleted completelyCopy the code

Java client access

1. Add a dependency to the MAVEN project POM file

< the dependency > < groupId > org. Apache. Kafka < / groupId > < artifactId > kafka - clients < / artifactId > < version > 0.11.0.1 < / version > </dependency>Copy the code

2. Message producers

package org.study.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;

public class ProducerSample {

    public static void main(String[] args) {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put("zk.connect"."127.0.0.1:2181"); // Address of zookeeper props. Put ("bootstrap.servers"."localhost:9092"); // Set up the host/port group to connect to the Kafka cluster. props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");

        String topic = "test-topic";
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        producer.send(new ProducerRecord<String, String>(topic, "idea-key2"."java-message 1"));
        producer.send(new ProducerRecord<String, String>(topic, "idea-key2"."java-message 2"));
        producer.send(new ProducerRecord<String, String>(topic, "idea-key2"."java-message 3")); producer.close(); }}Copy the code

3. Message consumers

package org.study.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerSample {

    public static void main(String[] args) {
        String topic = "test-topic"; // topic name Properties props = new Properties(); props.put("bootstrap.servers"."localhost:9092"); // Set up the host/port group to connect to the Kafka cluster. props.put("group.id"."testGroup1"); // Consumer Group Name props.put("enable.auto.commit"."true"); // Whether the Consumer offset automatically submits props. Put ("auto.commit.interval.ms"."1000"); // The interval for automatically submitting offset to ZooKeeper is milliseconds."key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); }}}Copy the code

4. Start the zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
Copy the code

5. Start the Kafka server

kafka-server-start /usr/local/etc/kafka/server.properties
Copy the code

6. Run the Consumer

Run Consumer first so that when a producer sends a message, the record of the message is seen in the Consumer back end.

7. Run the Producer

Run Producer, publish several messages, and see the received messages on the Consumer console

Kafka cluster configuration

Kafka has three cluster configurations: single node-single broker, single node-multiple Broker, and Multiple Node-multiple Broker

The first two are actually covered on the website.

single node – single broker

1. Start the zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
Copy the code

2. Start Kafka Broker

kafka-server-start /usr/local/etc/kafka/server.properties
Copy the code

3. Create a Kafka Topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic topic-singlenode-singlebroker
Copy the code

4. Start the producer to send messages

kafka-console-producer --broker-list localhost:9092 --topic topic-singlenode-singlebroker
Copy the code

The broker-list and topic parameters are required. Broker-list specifies the address of the broker to be connected in the format node_address:port. Topic is required because messages need to be sent to the consumer group that subscribed to the topic. You can now enter some information on the command line, and each line will be treated as a message.

5. Start the consumer message

kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-singlenode-singlebroker
Copy the code

After starting ZooKeeper, Broker, Producer, and Consumer in different terminal Windows, input messages in the Producer terminal and the messages will be displayed in the Consumer terminal.

single node – multiple broker

1. Start the zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
Copy the code

2. Start the broker

If you need to start multiple brokers on a single node (i.e., a single machine) (as an example to start three brokers), prepare multiple server.properties files. So you need to copy/usr/local/etc/kafka/server. The properties files. Because you need to specify a separate property configuration file for each broker, the three properties of broker.id, port, and log.dir must be different.

Create a new kafka-example directory and three log directories

mkdir kafka-example
mkdir kafka-logs-1
mkdir kafka-logs-2
mkdir kafka-logs-3
Copy the code

Copy/usr/local/etc/kafka/server. The three properties files

cp server.properties /Users/niwei/Downloads/kafka-example/server-1.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-2.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-3.properties
Copy the code

In the Broker1 configuration file server-1.properties, the relevant parameters to modify are:

broker.id=1
port=9093
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1
Copy the code

Broker2 configuration file server-2.properties, the relevant parameters to modify are:

broker.id=2
port=9094
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-2
Copy the code

Broker3 configuration file server-3.properties, the relevant parameters to modify are:

broker.id=3
port=9095
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-3
Copy the code

Start each broker

cd /Users/niwei/Downloads/kafka-example
kafka-server-start server-1.properties
kafka-server-start server-2.properties
kafka-server-start server-3.properties
Copy the code

3. Create a topic

Create a topic named topic-singlenode-multipleBroker

kafka-topics --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic topic-singlenode-multiplebroker
Copy the code

4. Start the producer to send messages

If a producer needs to connect multiple brokers, the parameter broker-list needs to be passed

kafka-console-producer --broker-list localhost:9093, localhost:9094, localhost:9095 --topic topic-singlenode-multiplebroker
Copy the code

5. Start the consumer message

kafka-console-consumer --zookeeper localhost:2181 --topic topic-singlenode-multiplebroker
Copy the code

multiple node – multiple broker

Set up the ZooKeeper cluster

1. Kafka cluster configuration

broker.id=1  Unique identification of the current machine in the cluster
port=9093 The default kafka port is 9092The host name = 192.168.121.101# This parameter is off by default, there is a bug in 0.8.1, DNS resolution issues, failure rate issues.
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1 # the directory in which messages are stored. This directory can be configured as comma-separated expressionsZookeeper. Connect = 192.168.120.101:2181192168 120.102:2181192168 120.103:2181Set the zooKeeper cluster connection port

num.network.threads=3 # This is the number of borker threads for network processing
num.io.threads=5 # This is the number of borker I/O processing threads
socket.send.buffer.bytes=102400 The size of the send buffer, the data is stored in the buffer before it reaches a certain size can improve performance
socket.receive.buffer.bytes=102400 The size of the receive buffer to serialize data to disk when it reaches a certain size
socket.request.max.bytes=104857600 # This parameter is the maximum number of requests sent to Kafka or sent to Kafka. This value cannot exceed the stack size of the JVM
num.partitions=1 The default number of partitions per topic is 1
log.retention.hours=24 The default maximum persistence time for messages is 24 hours
message.max.byte=5242880  The maximum number of messages to be saved is 5M
default.replication.factor=2  #kafka holds the number of copies of a message, and if one copy fails, the other can continue to serve
replica.fetch.max.bytes=5242880  Take the maximum direct number of messages
log.segment.bytes=1073741824 # This parameter is used because Kafka's messages are appended to a file. When this value is exceeded, Kafka creates a new file
log.retention.check.interval.ms=300000 # Every 300000 ms check log expiration time (log.retention. Hours =24), check directory to see if there are expired messages and delete them if there are
log.cleaner.enable=false Whether to enable log compression. This function can improve performance
Copy the code

Since it is multi-node, multi-broker, the configuration file server.properties for each broker should be modified as described above

2. Modify the configuration of producer

Kafka - the console - producer - broker - list 192.168.21.1:9092192168 21.2:9092192168 21.3:9092 - topic topic-multiplenode-multiplebrokerCopy the code

3. Modify the configuration of consumer

Kafka - the console - consumer - the zookeeper 192.168.120.101:2181192168 120.102:2181192168 120.103:2181 - topic topic-multiplenode-multiplebrokerCopy the code

Kafka high reliability configuration

Kafka provides high elasticity of data redundancy. For scenarios requiring high data reliability, replicas can be increased by replicas (replicas) and replicas (min.insync.replicas), but this will affect performance. On the other hand, performance increases while reliability decreases. Users need to make trade-offs between their own service features.

To ensure that data is written to Kafka securely and reliably, the following configuration is required:

1. Topic configuration

Replication. factor>=3, that is, the number of replicas is at least 3 2<=min.insync.replicas<=replication.factor

2. Broker configuration

Leader election conditions unclean. Leader. Election. Enable = false

3. Allocation of producer

The request. Required. Acks = 1, producer, type = sync

The secret of Kafka’s high throughput

Message-oriented middleware functions as data writing and data reading. Optimization can also be viewed from these two aspects.

To optimize write speed Kafak uses the following techniques:

1. Write data sequentially

Most disk or mechanical structure (SSDS are beyond the scope of discussion), if the message is written in a random manner in the disk, you need to press cylinder, head, sector way of addressing, mechanical movement of slow (relative) will consume a large amount of time, lead to the disk write speed and memory write speed difference of several orders of magnitude. To avoid random writes, Kafka uses sequential writes to store data, as shown in the following figure:

2. Memory-mapped files

Even with sequential writes to the hard drive, it’s impossible for the hard drive to catch up with the memory. So instead of writing data to hard disk in real time, Kafka takes advantage of modern operating systems’ paged storage to use memory to improve I/O efficiency. Memory Mapped Files (mmap) are also translated into Memory Mapped Files. In 64-bit operating systems, 20GB of data Files can be represented. It works by directly mapping Files to physical Memory using the operating system Page. After the mapping is complete, operations on physical memory are synchronized to the hard disk (by the operating system when appropriate). The mmap process reads and writes memory as if it were a hard disk, regardless of the size of the memory. You can get a big I/O boost using this method because it saves the overhead of user-to-kernel copying (calling the read function on a file puts the data into kernel memory and then copies it into user-space memory), but there is an obvious drawback — unreliability. Data written to Mmap is not actually written to disk; the operating system does not write data to disk until flush is called. So Kafka provides a parameter — producer.type to control whether the producer is actively flush. If Kafka writes to the Mmap, it immediately flush and then returns the producer called sync. If the mmap is returned immediately after being written, and the Producer does not call Flush, it is called async.

3. Standardize binary message formats

To avoid inefficient byte copying, especially under high load conditions, the impact is significant. To avoid this, Kafka uses a standardized binary message format shared by producers, brokers, and consumers so that blocks of data can be transferred between them without conversion, reducing the cost of byte copying.

Kafak optimizes the read speed primarily with zero copy

Zero Copy technology:

In traditional mode we read a file from the hard disk like this

(2) The application reads data from the kernel space into the user space cache

(3) The application writes data to the socket cache in the kernel space

(4) The operating system writes the data from the socket cache to the nic cache so that the data can be sent across the network

This is obviously inefficient, there are four copies, two system calls. The Unix operating system provides an optimized path for transferring data from the page cache to the socket. In Linux, this is done through the SendFile system call. Java provides a method to access this system call: the Filechannel.transferto API. This approach requires only one copy: the operating system sends the data directly from the page cache to the network, and only the last step in this optimized path is required to copy the data to the network card cache.

The secret to Kafka’s speed is that it turns all messages into a single file. Mmap improves I/O speed. Data is written at the end of the add so the speed is optimal. Read data with sendFile direct violence output. So there is no point in simply testing the speed of MQ. Kafka’s violent approach has taken MQ out of its pants and is more like a violent data transmitter.