Introduction to Kafka

1.1 Background History

In today’s society, various application systems such as commerce, social networking, search and browsing are constantly producing all kinds of information like information factories. In the era of big data, we are faced with the following challenges:

  1. How to gather this huge amount of information
  2. How to analyze it
  3. How to achieve the above two points in time

These challenges form a business demand model, in which producers produce information, consumers consume it, and between producers and consumers, there needs to be a bridge – message system. At a micro level, this requirement can also be understood as how messages are passed between different systems.

1.2 Kafka was born

Kafka is open-source by Linked-in

Kafka – is a framework that solves these problems by seamlessly connecting producers and consumers.

Kafka – A Throughput Distributed Messaging System

1.3 Kafka now

Apache Kafka is a distributed message system based on push-subscribe. It has the characteristics of fast, extensible and persistent. It is now an open source system owned by Apache and is widely used by commercial companies as part of the Hadoop ecosystem. Its biggest feature is that it can process large amounts of data in real time to meet various demand scenarios, such as hadoop-based batch processing system, low latency real-time system, storm/ Spark streaming processing engine.

Overview of Kafka technology

2.1 Features of Kafka

  • High throughput, low latency: Kafka can process hundreds of thousands of messages per second, with latency as low as a few milliseconds
  • Scalability: Kafka clusters support hot scaling
  • Persistence, reliability: Messages are persisted to local disks and data backup is supported to prevent data loss
  • Fault tolerance: Allow nodes in the cluster to fail (n-1 nodes are allowed to fail if the number of replicas is N)
  • High concurrency: Thousands of clients can read and write data simultaneously

2.2 Some important design ideas of Kafka

The following introduction first introduces the main idea of Kafka design, can let the relevant personnel in a short period of time to understand Kafka related features, if you want to further study, after each of the features are introduced in detail.

  • Consumergroup: Each consumer can form a group, and each message can be consumed by only one consumer in the group. If a message can be consumed by multiple consumers, these consumers must be in different groups.
  • Message status: In Kafka, the state of the message is stored in the consumer. The broker does not care which message is consumed by whom, but only records an offset value (pointing to the partition where the message will be consumed next). This means that if the consumer fails to process the message, A message on a broker may be consumed more than once.
  • Message persistence: Kafka persists messages to local file systems and is extremely efficient.
  • Message expiration: Kafka keeps messages for a long time so that consumers can consume them multiple times, although many of the details are configurable.
  • Batch sending: Kafka supports batch sending by message set to improve push efficiency.
  • push-and-pull : In Kafka, producers and consumers push and pull messages to the broker, while consumers pull messages from the broker. Both producers produce and consume messages asynchronously.
  • The relationship between brokers in a Kafka cluster: It is not a master-slave relationship. Brokers have the same status in the cluster. We can add or remove any broker node at will.
  • Load balancing: Kafka provides a metadata API to manage load between brokers (for Kafka0.8.x, zooKeeper is the main load balancing tool for Kafka0.8.x).
  • Synchronous asynchrony: Producer adopts asynchronous push mode, which greatly improves the throughput of Kafka system (synchronous or asynchronous mode can be controlled by parameters).
  • Partition: Kafka’s broker supports message partitioning. Producer can decide which partition to send messages to. The order of messages in a partition is the order in which Producer sends messages. Zoning is important, as we’ll see later.
  • Offline data loading: Kafka is also ideal for loading data into Hadoop or a data warehouse due to its support for scalable data persistence.
  • Plugin support: A number of plugins have been developed by the active community to extend Kafka’s capabilities, such as Storm, Hadoop, and Flume plugins.

2.3 Application Scenarios of Kafka

  • Log collection: A company can use Kafka to collect logs for a variety of services and open them to consumers, such as Hadoop, Hbase, and Solr, as a unified interface service.
  • Message systems: decouple producers and consumers, cache messages, and so on.
  • User activity tracking: Kafka is often used to record the activities of Web users or app users, such as browsing, searching, clicking, etc. These activities are published by various servers to Kafka topics, which subscribers subscribe to for real-time monitoring and analysis. Or load it into Hadoop or data warehouse for offline analysis and mining.
  • Operational metrics: Kafka is also used to record operational monitoring data. This includes collecting data for various distributed applications and producing centralized feedback for various operations, such as alarms and reports.
  • Streaming: spark Streaming and Storm
  • The event source

2.4 Kafka architecture components

The publishing and subscription object in Kafka is topic. We can create a topic for each type of data, calling the clients that publish messages to the topic producers, and the clients that subscribe messages from the topic consumers. Producers and consumers can read and write data from multiple topics simultaneously. A Kafka cluster consists of one or more broker servers that persist and back up specific Kafka messages.

  • Topic: The directory in which messages are stored is a topic
  • Producer: the party that produces messages to a topic
  • A party that consumes messages
  • Broker: A service instance of Kafka is a Broker

 

2.5 Kafka Topic&Partition

When messages are sent, they are sent to a topic, which is essentially a directory. A topic consists of Partition Logs, as shown in the following figure:

 

We can see that the messages in each Partition are ordered, and the production messages are continuously appended to the Partition log, each of which is given a unique offset value.

The Kafka cluster stores all messages, whether they are consumed or not; We can set the expiration time of messages so that only expired data is automatically cleared to free disk space. For example, if we set the message expiration time to 2 days, all messages within these 2 days will be saved to the cluster, and data will only be cleared after 2 days.

Kafka needs to maintain only one piece of metadata – the offset value of the consumption message in the Partition. Each time a Consumer consumes a message, the offset value increases by one. The state of the message is completely controlled by the Consumer, who can track and reset the offset so that the Consumer can read the message anywhere.

Storing message logs as partitions has multiple considerations. First, it is easy to scale in a cluster. Each Partition can be adjusted to fit the machine on which it resides, and a topic can be composed of multiple partitions, so the whole cluster can accommodate any size of data. The second is that concurrency can be improved because it can be read and written on a Partition basis.

Three, Kafka core components

3.1 Replications, Partitions and Leaders

As you can see, data in Kafka is persistent and fault-tolerant. Kafka allows users to set the number of replicas per topic, which determines how many brokers to store written data. If your replica number is set to 3, a copy of data will be stored on 3 different machines, allowing 2 machines to fail. It is generally recommended that the number of replicas be at least 2, so that data consumption is not affected when the machine is added, deleted, or restarted. If you have higher requirements for data persistence, you can set the number of copies to 3 or more.

Topics in Kafka are stored as partitions, and each topic can set its number of partitions, which determines the number of logs that make up the topic. When producing data, the Producer publishes the messages to the partitions of the topic according to certain rules (which can be customized). The above copies are all in units of partition, but only one copy of the partition is elected as the leader for reading and writing.

Considerations about how to set the partition value. A partition can only be consumed by one consumer (a consumer can consume multiple partitions at the same time), so if the number of partitions set is less than the number of consumers, some consumers will not consume data. Therefore, the number of recommended partitions must be greater than the number of consumers running at the same time. On the other hand, it is recommended that the number of partitions be larger than the number of brokers in the cluster, so that the leader partitions can be evenly distributed among brokers, resulting in cluster load balancing. Cloudera has hundreds of partitions per topic. Note that Kafka needs to allocate some memory for each partition to cache message data, and the larger the number of partitions, the larger the heap space allocated to Kafka.

3.2 Producers

Producers send messages directly to the leader partition on the broker, without a series of routing and forwarding through any intermediary. To implement this feature, each broker in a Kafka cluster can respond to producer requests and return meta information about the topic, including which machines are alive and where the leader partition is. Which leader Partitions are directly accessible at this stage?

The Producer client controls which partitions the messages are pushed to. This can be done by random allocation, implementing a class of random load balancing algorithms, or specifying some partitioning algorithms. Kafka provides an interface for users to implement custom partitioning. Users can assign a partitionKey to each message and use this key to implement hash partitioning algorithms. For example, with the userID as the partitionkey, messages with the same userID will be pushed to the same partition.

Batch pushing data greatly improves processing efficiency. Kafka Producer can send requests as a Batch after accumulating a certain number of messages in memory. The Batch quantity can be controlled by Producer parameters. The parameter value can be set to the cumulative number of messages (for example, 500 messages), cumulative time interval (for example, 100ms), or cumulative data size (64KB). By increasing the batch size, the number of network requests and disk I/OS can be reduced. However, specific parameter Settings need to be balanced between efficiency and timeliness.

Producers can send messages asynchronously and in parallel to Kafka, but the producer usually receives a future response after sending the message, which returns either an offset value or an error encountered during sending. There is an important parameter “acks” that determines the number of replicas producer requires the leader partition to receive. If the number of acks is set to 0, producer is not waiting for the broker to respond. The producer cannot know whether the message is successfully sent. In this case, data may be lost. However, if the acks value is 0, the maximum system throughput is achieved.

If acks is set to 1, producer will receive an acknowledgement from the broker when the leader Partition receives the message. This improves reliability because the client will wait until the broker confirms that the message has been received. If the value is set to -1, producer will be acknowledged by the broker when all backup partitions receive messages. This ensures maximum reliability.

A Kafka message consists of a fixed-length header and a variable-length byte array. Because Kafka messages support byte arrays, kafka can support any user-defined sequence number format or other existing formats such as Apache Avro, Protobuf, etc. Kafka does not limit the size of a single message, but we recommend that the message size be no larger than 1MB, and generally the message size is between 1 and 10kB.

3.3 Consumers

Kafka provides two consumer apis, the High-level API and the sample-API. Sample-api is a low-level API that maintains a connection to a single broker and is completely stateless, requiring offset values on each request, making it the most flexible.

In Kafka, the offset value of the currently read message is maintained by the consumer, so the consumer can decide how to read the data in Kafka. For example, a consumer can re-consume data that has been consumed by resetting the offset value. Kafka keeps data for a configurable period of time, whether it is consumed or not, and only deletes it when it expires.

The high-level API encapsulates access to a series of brokers in a cluster and can transparently consume a topic. It maintains the consumed message state itself, meaning that each consumed message is the next one.

The high-level API also supports consuming topics in groups. If consumers have the same group name, Kafka acts as a queue message service, and each consumer equally consumes the data in the corresponding partition. If consumers have different group names, then Kafka acts as a broadcast service, broadcasting all messages from the topic to each consumer.

 

Four, Kafka core features

4.1 compressed

Kafka supports sending messages by batch. Kafka also supports compressing message sets. The Producer can compress message sets using GZIP or Snappy formats. After being compressed on the Producer end, it needs to be decompressed on the Consumer end. The advantage of compression is to reduce the amount of data to be transmitted and reduce the pressure on network transmission. In big data processing, the bottleneck is usually reflected on the network rather than the CPU (compression and decompression will consume part of THE CPU resources).

To tell whether a message is compressed or uncompressed, Kafka adds a compression attribute byte to the header of the message. The last two bytes of this byte indicate the encoding used to compress the message. If the last two bytes are 0, the message is not compressed.

4.2 Message Reliability

In a message system, it is very important to ensure the reliability of messages in the process of production and consumption. In the actual message transmission process, the following three situations may occur:

  • A message failed to be sent. Procedure
  • A message is sent more than once
  • Ideally: exactly-once, a message is sent successfully and only once

There are many systems that claim to implement exactly-once, but they ignore the possibility that the producer or consumer will fail in the process of production and consumption. For example, a Producer successfully sends a message, but the message is lost on the way, or the message is successfully sent to the broker, which is successfully retrieved by the consumer, but the consumer fails to process the retrieved message.

From the Producer end: When a message is sent, the Producer waits for the broker to successfully receive the message. If the message is lost or one of the brokers fails, the Producer sends the message again. You can control whether to wait for all backup nodes to receive messages by parameter.

The broker records an offset value in the partition, which points to the message that the Consumer will consume next. When a Consumer receives a message but hangs during processing, the Consumer can use the offset value to find the previous message and process it again. The Consumer also has permission to control the offset value and do whatever it wants with messages persisted to the broker.

4.3 Backup Mechanism

The backup mechanism is a new feature of Kafka0.8. The backup mechanism greatly improves the reliability and stability of Kafka cluster. With a backup mechanism, Kafka allows nodes in a cluster to fail without affecting the entire cluster. A cluster with N backups allows N-1 nodes to fail. Among all backup nodes, one node acts as the Lead node, which holds the list of other backup nodes and maintains body synchronization between the backups. The following diagram illustrates Kafka’s backup mechanism:

 

4.4 Kafka efficiency related design

4.4.1 Message persistence

Kafka relies heavily on file systems to store and cache messages, and the general perception that disks are slow leads to skepticism that persistent structures are competitive. Depending on how we use them, disks are much faster or slower than you think.

One key fact related to disk performance is that the throughput of disk drives deviates from the find latency, i.e., linear writes are much faster than random writes. For example, on a 67,200rpm SATA RAID-5 disk array, the linear write speed is about 600M/ s, but the random write speed is only 100K/ s, which is nearly 6000 times different. Linear reads and writes are predictable in most applications, so operating systems use read-ahead and write-behind techniques to prefetch data from large blocks of data, or to combine multiple logical writes into a single uppercase physical write. More discussion can be found in ACMQueueArtical, where linear reads to disk can in some cases be faster than random access to memory.

To compensate for this divergence in performance, modern operating systems use free memory as disk caching, although there is a slight performance cost in memory reclamation. All disk reads and writes are performed on this unified cache.

In addition, if we are building on top of the JVM, two things should be clear to anyone familiar with Java memory application management:

  1. The memory consumption of an object is very high, often twice or more than the amount of data stored.
  2. As the amount of data in the heap increases, Java garbage collection becomes very expensive.

Based on these facts, it is better to leverage the file system and rely on page caching than to maintain an in-memory cache or other structure — we should at least double the available cache, by automatically accessing the available memory, and by storing a more compact byte structure rather than an object, it is possible to double it again. The result is a maximum of 28-30GB of cache on a 32GB machine, not counting GC penalties. In addition, these caches will always exist even if the service is restarted, whereas the in-process cache needs to be rebuilt in memory (10GB cache takes 10 minutes) or it needs to be started with a completely cold cache (very poor initialization performance). It also simplifies the code, as all the logic for maintaining cohesion between the cache and the file system is now internal to the operating system, making this more efficient and accurate than the one-off in-process attempts. If your disk application prefers sequential reads, read-ahead actually retrieves useful data from the human cache on each disk read.

All of this suggests a simple design: Instead of maintaining as much memory cache as possible and flushing it to the file system as needed, let’s take a different approach. All data is written immediately to a persistent log without invoking the refresh program. In fact, this simply means that the data will be transferred to the kernel page cache and flushed later. We can add a configuration item that allows users of the system to control when data is flushed to the physical hard drive.

4.4.2 Constant time performance guarantee

The design of persistent data structures in messaging systems is usually to maintain a B-tree or other metadata information associated with consumption queues that can randomly access the structure. B trees are a good structure to use in transactional and non-transactional semantics. But it requires a high cost, even though B tree operations require O(logN). Normally, this is considered equivalent to constant time, but this is not true for disk operations. Disk searches take 10ms at a time and can only be one at a time, so parallelization is limited.

Intuitively, a persistent queue can be built on reads and appending to a file, just like a normal logging solution. Although this structure does not support rich semantics compared to a B-tree, it has the advantage that all operations are constant time and reads and writes do not block each other. This design has great performance advantages: ultimately, system performance is completely independent of data size, and the server can take full advantage of cheap hard disks to provide efficient messaging services.

In fact, the fact that disk space is infinitely larger without affecting performance means that we can provide features that normal messaging systems cannot. For example, instead of deleting messages immediately after they are consumed, we can keep them for a relatively long time (say, a week).

4.4.3 Further improve efficiency

We have made a lot of efforts for efficiency. However, there is a very major application scenario: processing Web activity data, which is characterized by a very large amount of data, each Web browsing will generate a large number of write operations. Furthermore, we assume that every post is consumed by at least one consumer, so we need to make consumption cheaper.

In addition to solving the disk efficiency problem described above, there are two other types of inefficiencies in such systems:

  • Too many small I/O operations
  • Too many byte copies

To reduce the problem of lots of small I/O operations, Kafka’s protocol is built around message sets. Instead of sending only one message at a time, Producer can send a set of messages at a time. The server appends messages to the log in the form of message blocks, and the consumer queries a large number of linear data blocks at a time. The message collection, or MessageSet, is itself a very simple API that packages a byte array or file. Therefore, there is no separate serialization and deserialization step for the processing of messages, and the message fields can be deserialized on demand (if not, they can be deserialized).

Another efficiency issue is byte copying. To solve the byte copying problem, Kafka designs a “standard byte message” format that is shared by producers, brokers, and consumers. Kakfa’s Message logs on the broker side are directory files that MessageSet writes to disk in this “standard byte message” format.

Maintaining this common format is particularly important for the optimization of these operations: persistent network transfers of log blocks. The popular Unix operating system provides a very efficient way to implement data transfer between page caching and sockets. In Linux, this is called sendFile System Call (Java provides access to this system call: Filechannel.transferto API).

To understand the impact of SendFile, you need to understand the general path for transferring data from a file to the socket:

  1. The operating system reads data from disk into a page cache in kernel space
  2. The application reads data from kernel space into the user space cache
  3. The application writes data back to the socket cache in kernel space
  4. The operating system writes data from the socket cache to the network card cache in order to send data across the network

This is obviously very inefficient, there are four copies, two system calls. If sendFile is used, two copies can be avoided: the operating system sends data directly from the page cache to the network. So in this optimized path, only the last step is required to copy the data into the nic cache.

We expect multiple consumers on a topic to be a common application scenario. With zero-copy, the data is copied to the page cache only once and can then be reused on each consumption, rather than having to store the data in memory and then copy it to kernel space on each read. This allows messages to be consumed at the speed of a network connection. Thus, using a combination of page caching and sendfile, the entire Kafka cluster is almost served as a cache, and there is no pressure on the entire cluster even if there are many downstream consumers.

5. Kafka cluster deployment

5.1 Cluster Deployment

To improve performance, you are advised to deploy Kafka clusters on dedicated servers, separate them from Hadoop clusters. Kafka relies on disk reads and writes and a large page cache. Sharing nodes with Hadoop may affect its page cache performance.

The size of a Kafka cluster depends on the hardware configuration, the number of concurrent producers and consumers, the number of copies of data, and how long data is stored.

Throughput on disks is particularly important, because it is on disks that Kafka’s bottleneck is usually found.

Kafka relies on ZooKeeper. You are advised to deploy the ZooKeeper cluster on dedicated servers. The ZooKeeper cluster can have an even number of nodes (3, 5, and 7 are recommended). Notice The larger the ZooKeeper cluster is, the slower the READ/write performance is, because ZooKeeper needs to synchronize data between nodes. A 3-node ZooKeeper cluster allows one node to fail, and a 5-node cluster allows two multiple failures.

5.2 Cluster Size

There are many factors that determine how much storage capacity a Kafka cluster needs. The most accurate way to measure this is to simulate the load. Kafka also provides load testing tools.

If you don’t want to evaluate cluster size through simulations, the best way to do this is to extrapolate from disk space requirements. Let me make an estimate based on network and disk throughput requirements.

Let’s make the following assumptions:

  • W: Write MB per second
  • R: number of copies
  • C: The number of consumers

In general, the bottleneck of a Kafka cluster is network and disk throughput, so let’s first evaluate the network and disk requirements of the cluster.

For each message, each copy is written once, so the overall write speed is W*R. The data read part is mainly that each copy inside the cluster synchronizes message read from the leader to the consumer outside the cluster. Therefore, the read rate inside the cluster is (R-1)*W, and the read speed of the external consumer is C*W, so:

  • Write: W * R
  • Read :(r-1) *W+C*W

Note that we can cache some data while reading to reduce IO operations. If a cluster has M MB of memory and writes at W MB/ SEC, then M/(W*R) seconds of write can be cached. If the cluster has 32GB of memory and a write speed of 50MB/s, it can cache data for at least 10 minutes.

Six, Kafka main configuration

6.1 the Broker Config

 

 

 

 

 

6.2 Producer Config

 

 

 

6.3 Consumer Config

 

 

I’ve compiled some Kafka videos and PDFS. If you need them, you can click “like” and “favorites” to get access to them