Kafka is a great messaging system, so check out my back end book reading and recommendations to get an idea of its overall design. Today we’ll take a look at the implementation details (I’ve forked out some code), focusing first on the Producer side.

To use Kafka, you first instantiate a KafkaProducer. You need brokerIP, serializer, and non-required Properties such as acks (0, 1, n), compression, retries, batch.size. This simple interface controls most of the actions of the Producer, and after being instantiated, messages can be sent using the Send method.

The core implementation is this method:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); / / (1) the return doSend (interceptedRecord, callback); / / (2)}Copy the code

There are three message modes: send forget (ignore the return result), send synchronously (get the returned future object and set the callback function to NULL), and send asynchronously (set the callback function).

Let’s look at the properties of the ProducerRecord message class:

private final String topic; Private final Integer partition; Private final Headers Headers; // header private final key; // key private final V value; // value private final Long timestamp; / / timestampCopy the code

It has multiple constructors that can accommodate different message types: partitioned or keyless, for example.

ProducerInterceptors (there are 0 ~ ∞ interceptors, which form a chain of interceptors) intercept ProducerRecord (such as timestamp, audit and statistics, etc.)

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { ProducerRecord<K, V> interceptRecord = record; for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) {// Do not throw an Exception, proceed to the next interceptor if (record! = null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; }Copy the code

Process and return ProducerRecord if the user has a definition, otherwise return itself. Then doSend actually sends the message, and it is asynchronous:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; Try {// serialize key and value byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); Int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); // The callback and transaction are omitted. Header[] headers = record.headers().toArray(); . / / message is appended to the RecordAccumulator in RecordAccumulator RecordAppendResult result = accumulator. Append (tp, timestamp. serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); // If the batch is full or a new batch is created, the IO thread will wake up and send the batch. Is the sender wakeup method if (result. BatchIsFull | | result. NewBatchCreated) {the trace (" wakingup the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; } the catch (Exception e) {/ / intercept anomalies and throw this. Interceptors. OnSendError (record, tp, e); throw e; }}Copy the code

Here is how to calculate partitions:

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); // If the message has a partition, use it directly; otherwise, use the partition to calculate return partition! = null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }Copy the code

The default DefaultPartitioner implements the partition if it exists, calculates the partition based on the key otherwise, and allocates the partition using the round Robin algorithm if the key doesn’t exist either.

/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no  partition or key is present choose a partition in a round-robin fashion */ public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); If (keyBytes == null) {// Key is null int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); If (availablePartitions. Size () > 0) {// Partitions are available, Int part = utils.topositive (nextValue) % availablePartitions. Size (); return availablePartitions.get(part).partition(); } else {return Utils. ToPositive (nextValue) % numPartitions; }} else {return utils.topositive (utils.murmur2 (keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter ! = null) { counter = currentCounter; } } return counter.getAndIncrement(); // round robin}}Copy the code

That’s the logical processing of sending a message, but let’s look at the physical processing of sending a message.

Wakeup method for Sender (which is a Runnable contained in an IO thread ioThread that is constantly reading messages from the RecordAccumulator queue and sending data to the Broker via Selector) Is actually KafkaClient wakeup method of the interface by NetworkClient class implements, used the NIO, namely Java NIO. Channels. The Selector. Wakeup () method.

The main logic in Sender’s RUN is to keep executing prepare and wait messages:

long pollTimeout = sendProducerData(now); / / (3) client. Poll (pollTimeout, now); / / 4.Copy the code

③ Complete the message setting and save to the channel, then listen for the key of interest, implemented by KafkaChannel.

public void setSend(Send send) { if (this.send ! = null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); } / / transportLayer a method to implement the relevant public void addInterestOps int (ops) {key. InterestOps (key) interestOps () | ops); }Copy the code

(4) a poll of the Selector whose select is awakened by wakeup:

public void poll(long timeout) throws IOException { /* check ready keys */ long startSelect = time.nanoseconds(); int numReadyKeys = select(timeout); Wakeup causes it to stop blocking long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (numReadyKeys > 0 || ! immediatelyConnectedKeys.isEmpty() || dataInBuffers) { Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys(); // Poll from channels that have buffered data (but nothing more from the underlying socket) if (dataInBuffers) { keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice Set<SelectionKey> toPoll = keysWithBufferedRead; keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed pollSelectionKeys(toPoll, false, endSelect); } // Poll from channels where the underlying socket has more data pollSelectionKeys(readyKeys, false, endSelect); // Clear all selected keys so that they are included in the ready count for the next select readyKeys.clear(); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); immediatelyConnectedKeys.clear(); } else { madeReadProgressLastPoll = true; //no work is also "progress" } long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); }Copy the code

PollSelectionKeys calls the following methods to send the message:

public Send write() throws IOException { Send result = null; if (send ! = null && send(send)) { result = send; send = null; } return result; } private boolean send(Send send) throws IOException { send.writeTo(transportLayer); if (send.completed()) transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed(); }Copy the code

Send is a data packet sent by ByteBufferSend or MultiRecordsSend. WriteTo calls the transportLayer write method. PlaintextTransportLayer or SslTransportLayer, which distinguishes whether SSL is used:

public long writeTo(GatheringByteChannel channel) throws IOException {
    long written = channel.write(buffers);
    if (written < 0)
        throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
    remaining -= written;
    pending = TransportLayers.hasPendingWrites(channel);
    return written;
}

public int write(ByteBuffer src) throws IOException {
    return socketChannel.write(src);
}
Copy the code

So far, the main process of Producer’s business-related logic processing and non-business-related network 2 is sorted out clearly. Additional functionality is guaranteed through some configuration.

Such as order guarantee is Max. In. Flight. Requests.. Per connection, InFlightRequests doSend will judge (by NetworkClient canSendRequest call), As long as this parameter is set to 1, the current packet cannot be sent until the next packet is confirmed, thus achieving order

public boolean canSendMore(String node) {
    Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
    return queue == null || queue.isEmpty() ||
           (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
Copy the code

For reliability, the Sender adds a callback function to clientRequest sendProduceRequest by setting acks:

RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); // Call completeBatch}}; /** * Complete or retry post, * * @param batch The record batch * @param response The produce response * @param correlationId The correlation id for the request * @param now The current POSIX timestamp in milliseconds */ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now, long throttleUntilTimeMs) { } public class ProduceResponse extends AbstractResponse { /** * Possible error code: * INVALID_REQUIRED_ACKS (21) */ }Copy the code

Kafka source layer a layer of packaging a lot, complex, if there is an error please don’t hesitate to comment.