The installation

CentOS install kafka

  • Kafka : kafka.apache.org/downloads
  • ZooLeeper: zookeeper.apache.org/releases.ht…

Download and unzip

# Download and unzip$$tar wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz - ZXVF kafka_2. 12-2.1.1. TGZ $mv Kafka_2. 12-2.1.1..tgz/data/kafka# Download ZooKeeper and unzip itWget $$tar ZXVF - https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz Apache-zookeeper-3.5.8-bin.tar. gz $mv apache-zookeeper-3.5.8-bin /data/zookeeperCopy the code

Start the ZooKeeper

Copy the configuration template
$ cd /data/kafka/conf
$ cp zoo_sample.cfg zoo.cfg

See if the configuration needs to be changed
$ vim zoo.cfg

# command
$ ./bin/zkServer.sh start    # start
$ ./bin/zkServer.sh status   # state
$ ./bin/zkServer.sh stop     # stop
$ ./bin/zkServer.sh restart  # to restart

Use client tests
$ ./bin/zkCli.sh -server localhost:2181
$ quit
Copy the code

Start the Kafka

# backup configuration
$ cd /data/kafka
$ cp config/server.properties config/server.properties_copy

# change configuration
$ vim /data/kafka/config/server.properties

The id of each broker must be different in a cluster configuration
# broker.id=0

# monitor address setting (Intranet)
# listeners=PLAINTEXT://ip:9092

# IP address and port for providing services externally
# advertised. Listeners = PLAINTEXT: / / 106.75.84.97:9092

# Change the default value of num. Partitions for each topic (ucloud.ukafka = 3)
# num.partitions=3

# zookeeper configuration
# zookeeper.connect=localhost:2181

Start Kafka with configuration
$  ./bin/kafka-server-start.sh  config/server.properties&

# Status check
$ ps -ef|grep kafka
$ jps
Copy the code

Install Kafka under Docker

docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
Copy the code
docker pull wurstmeister/kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.111 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
Copy the code

introduce

  • Broker: Message-oriented middleware processes nodes. A Kafka node is a Kafka node. Multiple Brokers can form a Kafka cluster.
  • Topic: A class of messages, such as page View logs, Click logs, and so on, can exist as topics, and the Kafka cluster can be responsible for the distribution of multiple topics simultaneously.
  • Partition: A physical grouping of topics. A topic can be divided into multiple partitions, each of which is an ordered queue.
  • Segment: Partition Physically consists of multiple segments, as described in 2.2 and 2.3 below.
  • Offset: Each partition consists of a series of ordered, immutable messages that are continuously appended to the partition. Each message in a partition has a sequential sequence number called offset, which uniquely identifies a message.

Number relationship between Kafka partition and consumer

  • If there are more consumers than partitions, it is wasteful, because Kafka is designed to prevent concurrency on a partition, so the number of consumers should not be greater than the number of partitions.
  • If there are fewer consumers than partitions, one consumer will correspond to multiple partitions. In this case, the number of consumers and partitions will be properly allocated; otherwise, uneven data fetching will occur in partitions. Ideally, the number of partitons is an integer multiple of the number of consumers, so the number of partitions is important, such as 24, so it is easy to set the number of consumers.
  • If a consumer reads data from multiple partitions, there is no guarantee that the data is ordered between partitions. Kafka only guarantees that the data is ordered on one partition, but multiple partitions can vary depending on the order in which you read data
  • Adding or deleting consumers, brokers, and partitions causes rebalance. After the rebalance, the partitions of consumers will change

Quick start

Install components in the.NET Core project

Install-Package Confluent.Kafka
Copy the code

Open source: github.com/confluentin…

Add the IKafkaService service interface

public interface IKafkaService
{
    /// <summary>
    ///Sends a message to the specified topic
    /// </summary>
    /// <typeparam name="TMessage"></typeparam>
    /// <param name="topicName"></param>
    /// <param name="message"></param>
    /// <returns></returns>
    Task PublishAsync<TMessage> (string topicName, TMessage message) where TMessage : class;

    /// <summary>
    ///Subscribes to messages from the specified topic
    /// </summary>
    /// <typeparam name="TMessage"></typeparam>
    /// <param name="topics"></param>
    /// <param name="messageFunc"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    Task SubscribeAsync<TMessage> (IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class;
}
Copy the code

Implement IKafkaService

public class KafkaService : IKafkaService
{
    public async Task PublishAsync<TMessage> (string topicName, TMessage message) where TMessage : class
    {
        var config = new ProducerConfig
        {
            BootstrapServers = "127.0.0.1:9092"
        };
        using var producer = new ProducerBuilder<string.string>(config).Build();
        await producer.ProduceAsync(topicName, new Message<string.string>
        {
            Key = Guid.NewGuid().ToString(),
            Value = message.SerializeToJson()
        });
    }

    public async Task SubscribeAsync<TMessage> (IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = "127.0.0.1:9092",
            GroupId = "crow-consumer",
            EnableAutoCommit = false,
            StatisticsIntervalMs = 5000,
            SessionTimeoutMs = 6000,
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnablePartitionEof = true
        };
        //const int commitPeriod = 5;
        using var consumer = new ConsumerBuilder<Ignore, string>(config)
                             .SetErrorHandler((_, e) =>
                             {
                                 Console.WriteLine($"Error: {e.Reason}");
                             })
                             .SetStatisticsHandler((_, json) =>
                             {
                                 Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}> Message listening...");
                             })
                             .SetPartitionsAssignedHandler((c, partitions) =>
                             {
                                 string partitionsStr = string.Join(",", partitions);
                                 Console.WriteLine($" - Allocate kafka partition:{partitionsStr}");
                             })
                             .SetPartitionsRevokedHandler((c, partitions) =>
                             {
                                 string partitionsStr = string.Join(",", partitions);
                                 Console.WriteLine($" - Reclaim kafka partition:{partitionsStr}");
                             })
                             .Build();
        consumer.Subscribe(topics);
        try
        {
            while (true)
            {
                try
                {
                    var consumeResult = consumer.Consume(cancellationToken);
                    Console.WriteLine($"Consumed message '{consumeResult.Message? .Value}' at: '{consumeResult? .TopicPartitionOffset}'.");
                    if (consumeResult.IsPartitionEOF)
                    {
                        Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}To the bottom:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
                        continue;
                    }
                    TMessage messageResult = null;
                    try
                    {
                        messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value);
                    }
                    catch (Exception ex)
                    {
                        var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}[Exception message failed to deserialize, Value:{consumeResult.Message.Value}】 :{ex.StackTrace? .ToString()}";
                        Console.WriteLine(errorMessage);
                        messageResult = null;
                    }
                    if(messageResult ! =null/* && consumeResult.Offset % commitPeriod == 0*/)
                    {
                        messageFunc(messageResult);
                        try
                        {
                            consumer.Commit(consumeResult);
                        }
                        catch (KafkaException e)
                        {
                            Console.WriteLine(e.Message);
                        }
                    }
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Consume error: {e.Error.Reason}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Closing consumer.");
            consumer.Close();
        }
        awaitTask.CompletedTask; }}Copy the code

Inject IKafkaService and call it where you need it.

public class MessageService : IMessageService.ITransientDependency
{
    private readonly IKafkaService _kafkaService;
    public MessageService(IKafkaService kafkaService)
    {
        _kafkaService = kafkaService;
    }

    public async Task RequestTraceAdded(XxxEventData eventData)
    {
        await_kafkaService.PublishAsync(eventData.TopicName, eventData); }}Copy the code

This is equivalent to a producer, but when our message queue is sent, we need a consumer to consume it, so we can use a console project to receive the message to process the business.

var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
    e.Cancel = true;
    cts.Cancel();
};

await kafkaService.SubscribeAsync<XxxEventData>(topics, async (eventData) =>
{
    // Your logic

    Console.WriteLine($" - {eventData.EventTime:yyyy-MM-dd HH:mm:ss}{eventData.TopicName}】- > Processed");
}, cts.Token);
Copy the code

IKafkaService has written the interface for subscription messages, which can be used directly after injection.

Producer-consumer example

producers

static async Task Main(string[] args)
{
    if(args.Length ! =2)
    {
        Console.WriteLine("Usage: .. brokerList topicName");
        / / 127.0.0.1:9092 helloTopic
        return;
    }

    var brokerList = args.First();
    var topicName = args.Last();

    var config = new ProducerConfig { BootstrapServers = brokerList };

    using var producer = new ProducerBuilder<string.string>(config).Build();

    Console.WriteLine("\n-----------------------------------------------------------------------");
    Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");
    Console.WriteLine("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
    Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:");
    Console.WriteLine("> key value<Enter>");
    Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:");
    Console.WriteLine("> value<enter>");
    Console.WriteLine("Ctrl-C to quit.\n");

    var cancelled = false;

    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancelled = true;
    };

    while(! cancelled) { Console.Write(">");

        var text = string.Empty;

        try
        {
            text = Console.ReadLine();
        }
        catch (IOException)
        {
            break;
        }

        if (string.IsNullOrWhiteSpace(text))
        {
            break;
        }

        var key = string.Empty;
        var val = text;

        var index = text.IndexOf("");
        if(index ! =- 1)
        {
            key = text.Substring(0, index);
            val = text.Substring(index + 1);
        }

        try
        {
            var deliveryResult = await producer.ProduceAsync(topicName, new Message<string.string>
            {
                Key = key,
                Value = val
            });

            Console.WriteLine($"delivered to: {deliveryResult.TopicPartitionOffset}");
        }
        catch (ProduceException<string.string> e)
        {
            Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]. ""); }}}Copy the code

consumers

static void Main(string[] args)
{
    if(args.Length ! =2)
    {
        Console.WriteLine("Usage: .. brokerList topicName");
        / / 127.0.0.1:9092 helloTopic
        return;
    }

    var brokerList = args.First();
    var topicName = args.Last();

    Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");

    var cts = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cts.Cancel();
    };

    var config = new ConsumerConfig
    {
        BootstrapServers = brokerList,
        GroupId = "consumer",
        EnableAutoCommit = false,
        StatisticsIntervalMs = 5000,
        SessionTimeoutMs = 6000,
        AutoOffsetReset = AutoOffsetReset.Earliest,
        EnablePartitionEof = true
    };

    const int commitPeriod = 5;

    using var consumer = new ConsumerBuilder<Ignore, string>(config)
                         .SetErrorHandler((_, e) =>
                         {
                             Console.WriteLine($"Error: {e.Reason}");
                         })
                         .SetStatisticsHandler((_, json) =>
                         {
                             Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > monitoring..");
                             //Console.WriteLine($"Statistics: {json}");
                         })
                         .SetPartitionsAssignedHandler((c, partitions) =>
                         {
                             Console.WriteLine($"Assigned partitions: [{string.Join(",", partitions)}]. "");
                         })
                         .SetPartitionsRevokedHandler((c, partitions) =>
                         {
                             Console.WriteLine($"Revoking assignment: [{string.Join(",", partitions)}]. "");
                         })
                         .Build();
    consumer.Subscribe(topicName);

    try
    {
        while (true)
        {
            try
            {
                var consumeResult = consumer.Consume(cts.Token);

                if (consumeResult.IsPartitionEOF)
                {
                    Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");

                    continue;
                }

                Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");

                if (consumeResult.Offset % commitPeriod == 0)
                {
                    try
                    {
                        consumer.Commit(consumeResult);
                    }
                    catch (KafkaException e)
                    {
                        Console.WriteLine($"Commit error: {e.Error.Reason}");
                    }
                }
            }
            catch (ConsumeException e)
            {
                Console.WriteLine($"Consume error: {e.Error.Reason}");
            }
        }
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Closing consumer."); consumer.Close(); }}Copy the code