Hello everyone, my name is Xie Wei, I am a programmer.

Today’s topic: Kafka Usage Guide, single-node version.

1. Application scenarios

If you are a back-end engineer, the application you designed is running normally online, and a second kill activity suddenly crashes the system. When you check the system, you find that a lot of traffic is not processed, causing the system to hang up. At this time, there are two ideas: 1. Nginx reverse proxy forwards more requests to internal network servers for processing to achieve a load balancing purpose 2. With message system, more requests are “cached” with middleware, and then the cached requests are constantly fetched from this system for further processing.

The messaging system used by the latter is a usage scenario for Kafka.

So what is Kafka?

Kafka is a distributed messaging system that has been positioned as a distributed streaming processing platform.

In A nutshell, system A sends the message to the message system, and system B receives the message from the message system for subsequent processing.

A common term used to describe kafka scenarios is: peak clipping, peak clipping, and trough filling to make the system as smooth as possible.

Where this comes from: Three typical application scenarios for Kafka

  • The messaging system
  • The storage system
  • Distributed streaming processing platform

Message system is the most extensive application at present; Message transport needs to be stored for subsequent systems to pull, so it can also be used as a storage system; After pulling messages, they are actually processed by subsequent systems, so why not include data processing in Kafka systems as well? Distributed streaming platform, or something like that.

The core application is described below: messaging systems

2. Basic concepts

A message is generated by system A, sent to the messaging system, and pulled from the messaging system by system B. There are many concepts involved.

  • System A is called A producer and its purpose is to send messages
  • A message system, called a broker, is essentially a service process whose purpose is to receive messages from producers, pull requests from consumers, and persist them
  • System B, called consumer, is designed to pull messages from the messaging system

There are different setting parameters for producers and consumers, which determine the different behaviors of producers and consumers.

If a producer sends a message, it needs to know the address of the broker. The address of the broker (Kafka Server) determines the address of the persistent store and other behaviors. The Kafka system gives this notion of message differentiation a logical concept: Topic, meaning that different topics specified by producers are stored at different locations.

In view of the Topic, the simple scenario is that constantly to send content, persistent storage is constantly to append mode of storage, simple scenario that will be a problem, the problem is too much message data, does not favor the system consumption, very simple ideas, different points of “file” additional storage, the overall size, this concept is called partition in kafka: partition. Messages can be continuously sent to the partition in append mode. The partition is numbered and starts with 0. The message append mode is stored in the partition and is given an offset number

To pull messages from the broker system, consumers need to know the broker address first, Topic second, and more specifically, which partition and offset can be set to start consuming messages.

What if it gets lost? A simple way to do this is to perform redundant backup: Replication: Multiple backups, one of which is the Leader and the other is the follower. The Leader interconnects with the message, while the follower does not interconnect with the message directly but only with the Leader to continuously synchronize data.

Multiple brokers form a Kafka cluster, and in the event of a failure kafka system relies on ZooKeeper to re-elect a new leader.

kafka cluster:

Kafka Topic: Partitioning concept

Kafka cluster:

3. Use the client

Based on the above concepts: how do you build a Kafka service to complete the messaging system?

  • Start the service process: broker

Pseudo code:

type Broker struct{
    Addr 
    Config
    ...
}
Copy the code
  • Producers connect brokers

Pseudo code:


type Producer struct{
    Config
    Message 
    ...
}

Copy the code
  • Consumers connect brokers

Pseudo code

type Consumer strcut{
    Config
    Topic 
    Partitions
    Offset
    ...
}
Copy the code

Basic idea:

  • Start the Kafka service
  • System A connects to the service and sends messages
  • System B connects to the service and consumes messages

Examples from the official website: how to complete basic message sending and receiving.

Download the installation package: kafka_2.12-2.3.0.tgz

  • 2.12 refers to the compiler version
  • 2.3.0 refers to kafka version

After decompression, there are two most important directories:

  • Bin: a series of scripts, such as starting zooKeeper service, creating topic, producer production message, consumer consumption message, etc
zookeeper-server-start.sh
zookeeper-server-stop.sh
kafka-configs.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-consumer-groups.sh
kafka-topics.sh
kafka-server-start.sh
kafka-server-stop.sh
...

Copy the code
  • Config: configuration file: for example, configure the ZooKeeper port, kafka log storage directory, external port, maximum capacity of messages, and retention time
zookeeper.properties
server.properties
producer.properties
consumer.properties
...
Copy the code

About 200 parameters, sorry, I can’t remember. So what? You’re done? You’re not making money. You’re not getting a raise.

Basic default Settings, some by category:

  • zookeeper.properties

Kafka relies on ZooKeeper for distributed coordination

dataDir=/tmp/zookeeper
clientPort=2181
Copy the code

Remember the default clientPort=2181

  • server.properties

Kafka server service

Log.dirs =/ TMP /kafka-logs // log.retention. Hours =168 // log storage duration Broker.id =0 // Default broker ID, kafka mode, Note Number of listeners=PLAINTEXT://:9092 // Service gateway address zookeeper.connect=localhost:2181 // Address of the ZooKeeper cluster...Copy the code
  • producer.properties

Specify the contents of messages, etc

  • consumer.properties

Contract to consume the content of messages, etc

After configuring the parameters:

  • Start the zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
Copy the code
  • Start the Kafka service process
> bin/kafka-server-start.sh config/server.properties
Copy the code

Create a topic, query a topic, etc. Use kafka-topics. Sh

Producer production messages can be used as kafka-console-producer.sh

The consumer consumption message can be used: kafka-console-consumer.sh

Of course, these operations are usually only used for testing, and the actual use is to use the client corresponding to the language.

4. Demonstrate

Kafka Go version client:

Download and install:

go get -u -v github.com/Shopify/sarama
Copy the code

4.1 producers

System A

  • producers
type KafkaAction struct {
	DataSyncProducer  sarama.SyncProducer
	DataAsyncProducer sarama.AsyncProducer
}
Copy the code
// Synchronization mode

func newDataSyncProducer(brokerList []string) sarama.SyncProducer {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
	config.Producer.Retry.Max = 5                    // Retry up to 10 times to produce the message
	config.Producer.Return.Successes = true
	config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
	producer, err := sarama.NewSyncProducer(brokerList, config)
	iferr ! =nil {
		log.Fatalln("Failed to start Sarama producer1:", err)
	}
	return producer

}

Copy the code
// Asynchronous
func newDataAsyncProducer(brokerList []string) sarama.AsyncProducer {
	config := sarama.NewConfig()
	sarama.Logger = log.New(os.Stdout, "[KAFKA] ", log.LstdFlags)
	config.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
	config.Producer.Compression = sarama.CompressionSnappy   // Compress messages
	config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
	config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
	producer, err := sarama.NewAsyncProducer(brokerList, config)
	iferr ! =nil {
		log.Fatalln("Failed to start Sarama producer2:", err)
	}
	go func(a) {
		for err := range producer.Errors() {
			log.Println("Failed to write access log entry:", err)
		}
	}()
	return producer
}

Copy the code

Remember that producers have a set of configuration parameters? Config is only used for this purpose. It has default values and can be set by itself.

For example: compression algorithm

config.Producer.Compression = sarama.CompressionSnappy
Copy the code

Commonly used compression algorithms are:

  • gzip
  • snappy
  • lz4
  • zstd

Different compression algorithms are mainly different in compression ratio and throughput.

Like zoning rules

config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
Copy the code

Common partition rules:

  • Polling mechanism
  • Random partition
  • According to the key partition

For example, whether a message is sent successfully or not

onfig.Producer.RequiredAcks = sarama.WaitForLocal
Copy the code
  • Message: The producer only passes bytecode data.

interface

type Encoder interface {
	Encode() ([]byte, error)
	Length() int
}
Copy the code

The sent message needs to implement the Encoder interface, that is, the defined message structure needs to implement the Encode and Length methods.

type SendMessage struct {
	Method  string `json:"method"`
	URL     string `json:"url"`
	Value   string `json:"value"`
	Date    string `json:"date"`
	encoded []byte
	err     error
}

func (S *SendMessage) Length(a) int {
	b, e := json.Marshal(S)
	S.encoded = b
	S.err = e
	return len(string(b))
}
func (S *SendMessage) Encode(a) ([]byte, error) {
	return S.encoded, S.err
}
Copy the code
  • Send a message
func (K *KafkaAction) Do(v interface{}) {
	message := v.(SendMessage)
    // The sent message returns the partition and offset
	partition, offset, err := K.DataSyncProducer.SendMessage(&sarama.ProducerMessage{
		Topic: TOPIC,
		Value: &message,
	})
	iferr ! =nil {
		log.Println(err)
		return
	}
	value := map[string]string{
		"method": message.Method,
		"url":    message.URL,
		"value":  message.Value,
		"date":   message.Date,
	}
	fmt.Println(fmt.Sprintf("/%d/%d/%+v", partition, offset, value))
}
Copy the code

For example, we send messages as follows: topic: topic-golang partition/offset/value

/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/2/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/3/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
Copy the code

There is only one partition above, and the offset value keeps increasing.

Create another topic, divided into 10 sections. topic: topic-python

How does it look in the log?

// cdlog.dirs ; Properties topic-golang-0 topic-python-0 topic-python-1 topic-python-2 topic-python-3 topic-python-4 topic-python-5 topic-python-6 topic-python-7 topic-python-8 topic-python-9Copy the code

Send logs to topic-Python, polling for partitioning rules:

/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/1/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/2/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/3/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/4/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/5/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/6/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/7/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/8/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/9/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/1/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
Copy the code

Polling, continuous memory messages to partition.

4.2 consumers

System B

func main(a) {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	brokers := []string{"127.0.0.1:9092"}
	master, err := sarama.NewConsumer(brokers, config)
	iferr ! =nil {
		panic(err)
	}
	defer func(a) {
		iferr := master.Close(); err ! =nil {
			panic(err)
		}
	}()
	_, e := master.Partitions("topic-python")
	ife ! =nil {
		log.Println(e)
	}
	consumer, err := master.ConsumePartition("topic-python".0, sarama.OffsetOldest)
	iferr ! =nil {
		panic(err)
	}
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)
	doneCh := make(chan struct{})
	go func(a) {
		for {
			select {
			case err := <-consumer.Errors():
				fmt.Println(err)
			case msg := <-consumer.Messages():
				fmt.Println("Received messages".string(msg.Key), string(msg.Value), msg.Topic)
			case <-signals:
				fmt.Println("Interrupt is detected")
				doneCh <- struct{}{}
			}
		}
	}()
	<-doneCh
}
Copy the code
  • The consumer specifies topic: topic-Python
  • The consumer specified partition: 0

Remember the messages producers sent into topic-Python? partition/offset/value

/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/1/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/2/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/3/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/4/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/5/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/6/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/7/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/8/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/9/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/1/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
Copy the code

Partition: 0 contains two messages. The consumer then specifies a partition and can only consume these two messages.

Received messages  {"method":"get5"."url":"www.baidu.com4"."value":"da4"."date":"12344"} topic-python
Received messages  {"method":"get5"."url":"www.baidu.com4"."value":"da4"."date":"12344"} topic-python
Copy the code

4.3 other

Using the Kafka client, what other features do we need?

  • Topic creation, description, deletion, etc
  • Consumer group description, etc
  • Metadata: Metadata
type ClusterAdmin interface {
	CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
	ListTopics() (map[string]TopicDetail, error)
	DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
	DeleteTopic(topic string) error
	CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
	DeleteRecords(topic string, partitionOffsets map[int32]int64) error
	DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
	AlterConfig(resourceType ConfigResourceType, name string, entries map[string] *string, validateOnly bool) error
	CreateACL(resource Resource, acl Acl) error
	ListAcls(filter AclFilter) ([]ResourceAcls, error)
	DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
	ListConsumerGroups() (map[string]string, error)
	DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
	ListConsumerGroupOffsets(group string, topicPartitions map[string] []int32) (*OffsetFetchResponse, error)
	DeleteConsumerGroup(group string) error
	DescribeCluster() (brokers []*Broker, controllerID int32, err error)
	Close() error
}
Copy the code

So much for the basic applications of single-node Kafka.

Container services

Any system that provides a service can use container version, and Kafka can use container version. Configurations can be set in the form of environment variables.

docker-compose.yml

version: '2'
services:
  ui:
    image: index.docker.io/sheepkiller/kafka-manager:latest
    depends_on:
      - zookeeper
    ports:
      - 9000: 9000
    environment:
      ZK_HOSTS: zookeeper:2181
  zookeeper:
    image: index.docker.io/wurstmeister/zookeeper:latest
    ports:
      - 2181: 2181
  server:
    image: index.docker.io/wurstmeister/kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092: 9092
    environment:
      KAFKA_OFFSETS_TOPIC_REPLIATION_FACTOR: 1
      KAFKA_ADVERTISED_HOST_NAME: 127.0. 01.
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
Copy the code
  • Zookeeper Distributed coordination system
  • Kafka Server Kafka service
  • Kafka – Manager Kafka management platform

Subsequent cluster versions.

After < >

Code: github.com/wuxiaoxiaos…