This article was originally intended to be called “Building a Kafka Message queue cluster”. However, unlike RabbitMQ, Kafka does not implement a Message Queuing Protocol (e.g., AMQP, Advanced Message Queuing Protocol, Advanced Message Queue Protocol, an application layer standard that provides unified messaging services, is an open standard for application layer protocols designed for message-oriented middleware, so it is not strictly a message queue, although it is used like a queue. In the end, I compromised and changed the title to “Kafka Distributed Messaging System.”

1. Kafka is introduced

Kafka was first developed by LinkedIn using the Java and Scala languages and opened source in 2011. In 2012, Kafka became a top project of the Apache Software Foundation. In 2014, several of Kafka’s founders started a new company, Confluent, to do kafka-related work.

The goal of the Kafka project is to provide a unified, high-throughput, low-latency system platform for processing real-time data. Officially, Kafka has three main functions:

  1. Publish & subscribe: Like other messaging systems, publish and subscribe to streaming data.
  2. Processing: Write flow processing applications that respond to live events.
  3. Storage: Securely store streaming data in a distributed, fault-tolerant cluster.

1.1 Message System

In the first of the three roles, Kafka is a messaging system. So what is a messaging system? What kind of problems does it solve? Let’s take the popular microservices as an example. Assume that there are three terminal-oriented Web services (Http protocol), namely Web1, Web2 and Web3, and three internal application services (remote procedure call, such as WCF and gRPC) on the Web side. If there is no messaging system and they are directly connected, the way they communicate might look like this:

In this way, there are mainly the following problems:

  1. Services are tightly coupled. Suppose we modify the external interface of application service 2, then all components that invoke it need to be modified, and in the extreme case above (where all components invoke it, which is rare in practice), all other Web services and application services need to be modified accordingly.
  2. This tight coupling between services also sometimes creates an interface that cannot be modified: if an uncontrolled third party invokes the interface, modifying the interface will make third-party applications unavailable. Imagine that a change in the interface of wechat public number will cause thousands of online applications to fail.
  3. To solve the above problems, interfaces are often issued in version form, such as Web /v1/interface, Web/V1.1 /interface, APP /v2.0/interface. Interfaces are compatible between minor version numbers and incompatible between major version numbers.
  4. This interface planning approach solves the problem of tight coupling to some extent, but it also introduces a new problem: updating requires changing multiple version sequences, which can be difficult to maintain when there are too many versions.
  5. Adding or deleting clients is tedious. If a new application service 4 is added to provide necessary functions for a Web service, it is necessary to change Web service 1, Web service 2 and Web service 3 all over again. Similarly, if application service 2 is no longer needed, the code that calls it should also be removed on the Web server side.
  6. Performance is limited and cannot be extended. For example, to perform load balancing, you need to use third-party tools, such as Zookeeper or Consul. You may need to rewrite code or add specific configurations.

When a messaging system is introduced, the structure looks like this:

After the introduction of message system, the above problems will be effectively solved:

  1. All the components, Web services and application services, no longer care about each other’s interface definitions, but only about the data structure (Json structure).
  2. You only need to know the communication protocol with Kafka, and Kafka’s architecture is highly standardized, relatively stable and mature.
  3. Improved performance, Kafka is designed for big data transmission, and throughput is high enough to meet most enterprise needs.
  4. Easy to scale, Kafka itself can be scaled in clusters. In addition, its unique mode provides support for common requirements such as load balancing.

1.2 Two modes of message system

Producer/consumer model:

Producer: Applications that produce messages at one end of the data pipeline.

Consumer: An application that consumes messages at one end of the data pipeline.

  1. The producer sends the message to the queue, and if no consumer connects to the queue or consumes the message, the message will remain in the queue until the queue is full or a consumer comes online.
  2. The producer sends the message to the queue, and if there are multiple consumer connection queues, the same message will only be sent to one of them. Therefore, when there are multiple consumers, it is actually a natural load balancing.

Publisher/Subscriber mode:

Publisher: An application that generates events on one side of the data pipeline.

Subscriber: An application that responds to events at one end of the data pipeline.

When using the publisher/subscriber pattern, the data sent to the queue is not called a message, but an event. Processing of data is not called consuming messages, it is called event subscriptions.

  1. The publisher publishes an event, and if there are no subscribers connected to the queue at this point, the event is lost, that is, no applications respond to the event. If a subscriber goes online in the future, the event will not be rereceived.
  2. The publisher publishes an event, and if multiple subscribers are connected to the queue at this point, the event is broadcast to all subscribers, and each subscriber receives the exact same event. So there is no load balancing

1.3 Stream processing applications

Distinguish between batch and stream handlers.

The biggest difference between batch and stream processing is whether the data has clear boundaries. If there are boundaries, it is called batch processing, for example: the client collects data every hour, sends it to the server for statistics, and saves the statistics to the statistics database.

If there are no boundaries, it is called streaming data (streaming processing). Typical stream processing, such as logs and orders for large web sites, is because logs and orders are generated continuously, just like a data stream. If each log and order is processed within a few hundred milliseconds or seconds of being generated, it is a streaming program. If the data is collected once an hour and then sent uniformly, the original streaming data is converted into batch data.

Streaming processing is sometimes necessary: for example, Ma Yun needs to display the orders and sales of Tmall Double 11 on the big screen in real time. If the data center says: We are T+1 and the data of Double 11 can only be obtained on 12th, I think Ma Baba will not agree.

Kafka Streaming provides a special component Kafka Streaming to process stream data. For other Hadoop ecosystem projects, each provides a different component. For example, Spark also includes Spark Streming to handle streaming data. Storm, the granddaddy of streaming data processing, was specifically developed to handle streaming data.

In addition to using data boundaries to differentiate streaming from batch processing, another approach is processing time. Batch processing usually takes hours or days, and stream processing takes seconds. On the other hand, batch processing is called offline data processing and stream processing is called real-time data processing. There is also a minute-by-minute approach called near line data processing, but this approach is less discussed and is an off-line approach that only shortens the processing cycle.

1.4 Storage: Securely store streaming data in a distributed, fault-tolerant cluster

By default, data in Kafka can be saved for a week. At the same time, Kafka naturally supports clustering, which makes it easy to add or subtract machines, and can specify the number of copies of data to ensure stable cluster service in the event of individual server outage within the cluster.

In the project application of our data center, it is mainly used for data transmission. To see what it solves, let’s take a quick look at the project scenario:

At the front end of the project are applications. There could be dozens or hundreds of them in the future, but there are only 10 of them right now. These front-end applications send data to the back-end data center (a program we call the data collector, or collector for short), and the collector and the application are clearly in a one-to-many relationship. In this case, the collector is idle most of the time, but when multiple applications send data at the same time, the collector cannot process it. A buffer mechanism is needed so that the collector is not too idle and not too busy. Kafka can then be used as the data buffer pool.

In this example, Kafka is chosen over traditional mature message queue components such as RabbitMQ because Kafka is inherently designed to handle large volumes of data and therefore performs better.

In addition to serving as a buffer, Kafka also serves as a “smooth upgrade” for data center applications.

The requirements are as follows: previous front-end applications, data acquisition, data cleaning procedures are used. Net development, and into the MS SQL Server database. To cope with the increasing amount of data, we decided to adopt big data technology, store data in HDFS, and use Spark to collect data.

With the introduction of Kafka, there is no need to make any changes to the old front-end applications, data collection, or cleaning procedures. You can plug in a new collection/cleaning program, because you just need to fetch data from Kafka.

When the new version passes the test, you simply stop the old version and switch smoothly to the new system.

1.5 Challenges of introducing message queues

It is impossible to have advantages without disadvantages, and the main challenges of introducing Kafka are as follows:

  1. Kafka dependencies. Although applications in the system no longer depend on each other, they all rely heavily on Kafka, where the stability of Kafka is very important (infrastructure like MSSQL Server).
  2. In the practice of micro-services, there is another call for service independence, that is, each service can provide services independently without other components, that is, decentralization. The choice of timing of the two ways is very important.
  3. Message queues are asynchronous by nature, which increases performance but code complexity. The simple operation that originally used RPC synchronous call to return results increased the complexity of code writing and debugging after adopting asynchronous call.

Broker, Topic, and Partition

2.1 the Broker

Broker: The word Broker literally means agent. I find the term confusing, but it’s basically the server that runs Kafka, and more specifically the server process that runs Kafka.

  • When you connect to any Broker in the cluster, you have access to the entire cluster.
  • Brokers within a cluster are distinguished by ids, which are pure numbers.

2.2 Topic, Partition, and Offset

Topic: Can be understood as a data pipeline that produces messages/publishes events at one end and consumes messages/responds to events at the other end. The pipes themselves store, route, and send messages/events. A topic is identified by its Name.

Data in a topic, whether consumed or not, is kept for a specified period of time, a week by default.

Topics can be divided into multiple Partitions (Partitions).

  • The data within the partition is ordered. When a topic has only one partition, the messages for that topic are also ordered; But if a topic has multiple partitions, the message is unordered.
  • The more partitions, the more parallel processing. The usual recommendation is host number x2, for example if there are three servers in the cluster, you can create six partitions for each topic.
  • Once a message is written to a partition, it is immutable and cannot be modified. Unless the subject is rebuilt, modify the data and resend it.
  • When there is no key, data is sent to any partition of the topic; When a key is present, data with the same key is sent to the same partition.

Each message sent to the Partition will get an increasing ID, called offset. Overall, the structure is as follows:

2.3 Distribution of brokers, Topics, and Partitions

You can set a different number of partitions for different topics. If there are multiple nodes in the cluster, they will be randomly distributed among different nodes. Topic1 has three partitions, Topic2 has only two.

2.4 Replication Factor

The number of copies of a Topic is usually set to 2 or 3, so that when a node fails and goes offline, the Topic is still available and serviced by other nodes in the cluster.

Obviously, more copies is not always better. The more copies there are, the longer it takes to synchronize data and the lower the disk usage.

Note: The more nodes in a Kafka cluster or Hadoop cluster, the higher the fault tolerance, the same; It’s just less likely that the related nodes will fail at the same time. For example, if you have 100 nodes in a Hadoop cluster and the number of copies is set to 2, the nodes that store both copies fail and the data is also inaccessible. The probability of simultaneous failure of nodes with 100 nodes is lower than that of 5 or 3 nodes with exactly the same copy.

2.5 Partition Leader and ISR

For a Topic with multiple partitions, there is only one Leader Partition and one or more ISRs (In-sync Replicas). The leader does the reading and writing, while the ISR only serves as a backup.

3

3.1 Producer writes data to the Topic

All the Producer needs to do is specify the Name of the Topic and connect to any node in the cluster. Kafka will automatically load balance the writes and route them to the correct Partition (multiple partitions will be on different nodes in the cluster).

Note that the ISR Partition is not included in the figure above in order to make mapping easier.

3.2 Producer Acks

Producer can choose to be notified of data writes in one of the following ways:

  1. Acks=0, indicating that the speed is the fastest. Producer does not wait for write notification, and data may be lost.
  2. Acks=1. The speed is fast. The Producer waits for the Leader notification but does not wait for the ISR notification.
  3. Acks=all: indicates that the speed is the slowest. The Producer waits for the notification from the Leader and ISR, and data is not lost.

3.3 Producer Keys

When sending data, the Producer can specify a Key, which is usually based on the data being sent.

For example, if you want to send an order data from an e-commerce Retailer (OrderNo order number, Retailer, Customer buyer).

If:

  • The recipient of the data (Consumer) does not care about the order in which the order is sent, so the key can be empty or OrderNo.
  • The receiving end of the data requires the seller’s order to be sent in order, so the Key is set to Retaier.
  • The receiving end of the data requires that the buyer’s order be sent in order, so the Key is set to Customer.

The tricky part here is that when the Key is’ Retailer ‘, we don’t say ‘Retailer’ every time we send the Key, but ‘Retailer’. Take the following table for example:

OrderNo Customer OrderAmount OrderDate Retailer
001 Jimmy 5200 2017-10-01 00:00:00 Apple
002 Jack 3180 2017-11-01 00:00:00 Apple
003 Jimmy 2010 2017-12-01 00:00:00 XiaoMi
004 Alice 980 2018-10-01 00:00:00 XiaoMi
005 Eva 1080 2018-10-20 00:00:00 XiaoMi
006 Alice 680 2018-11-01 00:00:00 XiaoMi
007 Alice 920 2018-12-01 00:00:00 Apple

Then for orders 001~007, Retailer is used as the Key, the value of Key is Apple(001), Apple(002), XiaoMi(003), XiaoMi(004), XiaoMi(005), XiaoMi(006), Apple(007).

In this way, all Apple orders would be sent sequentially to the same Partition, while all XiaoMi orders would be sent sequentially to the same Partition. The two partions may be the same or different. As shown below:

4. Consumer

4.1 Basic concept of Consumer

Consume is used to read data from a Topic. Like Producer, simply connect to any node in the cluster and specify a Topic name. Kafka automatically extracts data from the correct brokers and partitions and sends them to consumers.

For each Partition, the data is ordered, as shown in the figure below:

4.2 Consumer Group,

Kafka cleverly combines producer/consumer and publisher/subscriber patterns using the concept of groups.

A Topic can have multiple groups, and a Group can contain multiple consumers. For consumers in the Group, they are in producer/Consumer mode, and a message can only be consumed by one Consumer in the Group. For different groups, it is publisher/subscriber mode, where the same message is sent to all groups. This relationship is best described in the following diagram:

Note: A Partition is assigned to only one Consumer in the same Group. If there are only three partitions, but there are four consumers in a Group, then one Consumer is redundant and cannot receive any data.

4.3 Consumer Offsets

The first thing to note is that the Conumser Offsets here and Offsets in the previous Topic are two completely different concepts. The Offsets here are consumer-specific, and the previous Offsets are topic-specific (Partition, in particular). There are several points to note:

  • Offset records the position read by each Consumer under each Group.
  • Kafka uses a special Topic to hold Consumer Offsets. The name of this Topic is __consumer_offsets.
  • When the Consumer goes offline and comes back online, data is read from the previous offsets record.

The submission time of Offsets

  1. The Consumer submits offsets as soon as it receives a message. This is the most efficient, but potentially if the message processing fails, such as a program exception, then the message is not retrievable.
  2. Offsets are submitted At least once after the Consumer has processed the message. This may be read twice, because if there is an exception during processing, the message will be read again. This is the default.
  3. Exactly Once: Still in the experimental stage.

The general practice is to select at least once, and then perform the processing on the application to ensure that the operation can be repeated without affecting the final result.

CAP theory: A distributed system can only satisfy at most two of Consistency, Availability and Partition tolerance simultaneously.

4.4 a Zookeeper

Zookeeper is a distributed service registration, discovery, and governance component. Many components in the big data ecosystem, such as HDFS, are used by Zookeeper. Kafka relies heavily on Zookeeper. In fact, Kafka’s installation package includes a compatible version of Zookeeper.

In Kafka, Zookeeper has the following functions:

  • Manages nodes in the cluster and maintains the node list.
  • Manage all topics and maintain a list of topics.
  • The leader election of the partition.
  • Notify Kafka when a cluster changes, including creating a Topic, bringing Broker online/offline, and deleting a Topic

5. To summarize

This is a long article that discusses the main concepts and mechanisms in Kafka, and I believe that you have already got a basic understanding of Kafka. In the sections that follow, we’ll see how Kafka works in action. Kafka is very stable and robust for personal use, and I hope you like it as much as I do.

Thanks for reading and I hope you found this article helpful!