“This is the 22nd day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.

The Kafka Go client is not officially available, but there are two very popular libraries on Github

  1. More stars, online cases are also more github.com/Shopify/sar…
  2. Confluent library github.com/confluentin…

Sarama is used here, because there are many stars and many cases, which makes it easy to get started quickly

Pay attention to

If kafka version in 2.2 the following, need to go. The mod will sarama version inside to github.com/Shopify/sarama v1.24.1

This is because Sarama only comes with the latest 2 releases plus a 2 month compatibility guarantee, so using the lower version kafka is a pitfall

If you use a non-cluster producer, you need to create your own topic. If you use a cluster producer, the cluster will be created automatically

example

package main

import (
   "fmt"
   "github.com/Shopify/sarama"
   cluster "github.com/bsm/sarama-cluster"
   "time"
)

var (
   Consumer *cluster.Consumer
   producer sarama.SyncProducer
   brokers = []string{"ip1:9092"."ip2:9092"."ip3:9092"}
   topic = "testGo"
   groupId = "testGo_test1"
)

func initProducer() {
   var err error
   config := sarama.NewConfig()
   config.Producer.RequiredAcks = sarama.WaitForLocal
   config.Producer.Retry.Max = 3
   config.Producer.Return.Successes = true
   brokers := brokers
   producer, err = sarama.NewSyncProducer(brokers,config)
   iferr ! = nil { fmt.Printf("Producer initialization failed -> %v \n", err)
      panic(err)
   }
   fmt.Println("Producer initialization succeeded.")
}

func initConsumer()  {
   var err error
   config := cluster.NewConfig()
   config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
   config.Consumer.Offsets.Initial = sarama.OffsetNewest
   Consumer, err = cluster.NewConsumer(brokers,groupId,[]string{topic},config)
   iferr ! = nil { fmt.Printf("Consumer initialization failed -> %v \n", err)
      panic(err.Error())
   }
   if Consumer == nil {
      panic(fmt.Sprintf(Kafka info -> {brokers:%v, topic: %v, group: %v}", brokers, topic, groupId))
   }
   fmt.Printf("Consumer -> %v, topic -> %v,", Consumer, topic)
}

func main() {
   initProducer()
   initConsumer()

   // Production message
   for i := 1; i <100; i ++ {
      pid, offset, err := producer.SendMessage(&sarama.ProducerMessage{
         Topic:     topic,
         Key:       sarama.StringEncoder(i),
         Value:     sarama.ByteEncoder("this is test message."})),iferr ! = nil { fmt.Println("Failed to send message, err:", err)
         return
      }
      fmt.Printf("offset: %v\n", offset)
   }
   
   time.Sleep(2 * time.Second)

   // Consume messages
   for {
       select {
       case msg, ok: =<-Consumer.Messages(): if ok { fmt.Printf("kafka msg: %s \n", msg.Value) } } } }Copy the code

The result is as follows

The demo process is as follows

  1. Introduce standalone sarama library and cluster Sarama library
  2. Defining connection variables
  3. Instantiate a producer using the single Sarama library
  4. Instantiate a consumer using the clustered Sarama library
  5. Loop 100 times to send 100 messages
  6. Use sarama’s own production message constructor to set the message content
  7. Use for to keep the process listening for messages from Kafka