1 Overall message sending process



(1) The first step is to wait for metadata to be pulled, as mentioned last time.

(2) The metadata is in place and the topic and key are serialized.

(3) Select Partition for 3 situations

A If the sequence number of the patition is specified in the message, use the specified one first. But you don't usually do that, so b doesn't specify a key, so you take an atom int increment and size and you take modulus and you choose partition, which is like polling. If C specifies a key, convert the serialized key+topic into a hash, and then take the module.

(4) Check the size of the message, including whether the size of the message itself exceeds the single limit and whether it exceeds the buffer size. (5) Message into the buffer, Acumulator, sender thread will operate the buffer, network send/receive. (6) If the batch is full, or if there is a new batch, we will send the new batch.

Add message – overview to Recordacumulator buffer

(1) RecordAccumulator data structure, the focus is on batches of this map, topic – > deque queue – > batchs of three layer structure (2) the first step in the message, is to put a message in the accumulator, the inside of the deque, batch, Each batch buffer needs to be initialized. After the initialization is completed, the corresponding message only needs to be put into the batch of a topic. (3) The initialization of each component ensures thread safety.

3 RecordAccumulator adds message detail resolution

Based on the above diagram, look at the implementation details:

(1) GetOrCreateDeque, which is empty the first time, creates a joining batches, In the image above you see Batches in ConcurrentHashMap, the key in TopicPartition, and the corresponding hash value in TopicPartition is Topic + “-” + Partition, which is still thread-safe even if it’s a concurrent send.



(2) TryAppend, try to write a message,

A TryAppend is the most recent recordBatch that got the topic+partition. B RecordBatch contains MemoryRecords, encapsulating the underlying Buffer. C writes data through the Compressor component, The message format parsed into offset | size | magic in CRC | | attributes | timestamp | key size | key value size | | the value of a fixed kafka message format, and then by writing the corresponding input stream buffer.

In the case of a normal asynchronous message writing process, a successful send will return the message. Thus, a topic+patition corresponds to a send queue, corresponds to n send batches, and also corresponds to n send buffers. (3) The first time to send the Topic-Partition message, there is no batch, so there is no corresponding memory buffer, corresponding to the top right corner of the figure returns null, to apply for memory buffer. (4) After getting Buffer, Buffer is encapsulated in MemoryRecords, and then encapsulated in RecordBatch, which represents a batch. Then, TryAppend of this batch is called, and this time it can be written. Finally, place the batch in the topic-partition queue corresponding to the acummulator.

The process of applying a memory buffer

(1) When we are going to send messages to the partition of a topic for the first time, RecordAccumulator must not have its buffer queue batch in the corresponding deque, so we need to apply for batch batch. The essence of the BACH batch is the encapsulated ByteBuffer, which in turn needs to be applied from machine memory. (2) To apply for memory, it is required to apply from RecordAccumulator BufferPool. This is a buffer pool, which records the buffer queue Deque

, and maintains the totalMemory of the buffer. AvailableMemory, Buffer size per batch, PoolableSize, etc. (3) Apply for cache. If the remaining cache is enough, there are two cases

A If the requested memory is batch size and there is one in the buffer pool, then poll one to return

B If the request size is larger than one batch size and there is enough cache left, then release some of the cache from the buffer pool and allocate a large buffer. Freeup is when the while loop releases the buffer from the buffer pool until there are enough caches left to allocate the buffer.





(4) If the remaining cache is not enough to apply for, it will block and wake up this thread until other threads release memory resources. The wake up here may be when a batch is sent, and the buffer occupied will be returned to the queue when the batch is empty.



If there is a wake up, two aspects should be checked here. One is whether the remaining cache or buffer pool is free after the wake up, and the other is whether the waiting time has timed out (the time here refers to the timeout of the send of the producer). Then fromAllocate one buffer in the buffer pool or one buffer in the memory pool.

The data structure of the 5 sending message buffer

Thus, the simplified version of the send buffer data structure,Integrated message batch partitioning, buffer staging and system memory allocation three functions