kafka producer

Kafka producer sends a message. Kafka producer sends a message.

Initialization of producer

The first is the creation of a producer

public Producer(String topic, Boolean isAsync)
  {
    Properties props = new Properties();
    props.put("bootstrap.servers"."localhost:9092");
    props.put("client.id"."DemoProducer");
    props.put("key.serializer"."org.apache.kafka.common.serialization.IntegerSerializer");
    props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
    producer = new KafkaProducer<Integer, String>(props);
    this.topic = topic;
    this.isAsync = isAsync;
  }
Copy the code

Take a look inside the new KafkaProducer() method

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
       try {
           log.trace("Starting the Kafka producer");
           Map<String, Object> userProvidedConfigs = config.originals();
           this.producerConfig = config;
           this.time = new SystemTime();

           MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                   .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                           TimeUnit.MILLISECONDS);
           clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
           if (clientId.length() <= 0)
               clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
           List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                   MetricsReporter.class);
           reporters.add(new JmxReporter(JMX_PREFIX));
           this.metrics = new Metrics(metricConfig, reporters, time);
           this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
           long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
           this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
           this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
           this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
           this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
           /* check for user defined settings. * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG. * This should be removed with release 0.9 when the deprecated configs are removed
           if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
               log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
                       "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
               boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
               if (blockOnBufferFull) {
                   this.maxBlockTimeMs = Long.MAX_VALUE;
               } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
                   log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                           "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                   this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
               } else {
                   this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); }}else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
               log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                       "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
               this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
           } else {
               this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
           }

           /* check for user defined settings. * If the TIME_OUT config is set use that for request timeout. * This should be Removed with release 0.9 */
           if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
               log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
                       ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
               this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
           } else {
               this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
           }

           Map<String, String> metricTags = new LinkedHashMap<String, String>();
           metricTags.put("client-id", clientId);
           //RecordAccumulator is used to cache outgoing logs. I'll talk about that later
           this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                   this.totalMemorySize,
                   this.compressionType,
                   config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                   retryBackoffMs,
                   metrics,
                   time,
                   metricTags);
           // This configuration is used to help Producer discover clusters
           List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
           // Use bootstrap Server to initialize a cluster.
           this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
           // A niO-related component, described later
           ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
           NetworkClient client = new NetworkClient(
                   new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder),
                   this.metadata,
                   clientId,
                   config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                   config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                   config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                   config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                   this.requestTimeoutMs, time);
           this.sender = new Sender(client,
                   this.metadata,
                   this.accumulator,
                   config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                   (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                   config.getInt(ProducerConfig.RETRIES_CONFIG),
                   this.metrics,
                   new SystemTime(),
                   clientId,
                   this.requestTimeoutMs);
           String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? "|" + clientId : "");
           // Kafka uses a thread to execute the sender task
           this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
           this.ioThread.start();

           this.errors = this.metrics.sensor("errors");

           if (keySerializer == null) {
               this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                       Serializer.class);
               this.keySerializer.configure(config.originals(), true);
           } else {
               config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
               this.keySerializer = keySerializer;
           }
           if (valueSerializer == null) {
               this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                       Serializer.class);
               this.valueSerializer.configure(config.originals(), false);
           } else {
               config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
               this.valueSerializer = valueSerializer;
           }
           config.logUnused();
           AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
           log.debug("Kafka producer started");
       } catch (Throwable t) {
           // call close methods if internal objects are already constructed
           // this is to prevent resource leak. see KAFKA-2121
           close(0, TimeUnit.MILLISECONDS, true);
           // now propagate the exception
           throw new KafkaException("Failed to construct kafka producer", t); }}Copy the code

This method is too long to explain line by line; the most important ones are noted in the code.

Send Process Analysis

Here’s the whole send process, broken down into snippets.

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        try {
            // first make sure the metadata for the topic is available
            Snippet 1. Wait for the metadata, which takes waitedOnMetadataMs
            long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
            long remainingWaitMs = Math.max(0.this.maxBlockTimeMs - waitedOnMetadataMs);
            // Serialize the key and value in record
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            }
            // Snippet 2. Use the divider to get the partition for this produce
            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
            //snippet 3. Calculate the serialized size of the message
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
            //snippet 4. Validate some limit on the effective size of the message
            ensureValidRecordSize(serializedSize);
            TopicPartition tp = new TopicPartition(record.topic(), partition);
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            //snippet 5. Put the message to be sent into the buffer
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                // Snippet 6. If the batch is full, wake up the send component to send
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if(callback ! =null)
                callback.onCompletion(null, e);
            this.errors.record();
            return new FutureFailure(e);
        } catch (InterruptedException e) {
            this.errors.record();
            throw new InterruptException(e);
        } catch (BufferExhaustedException e) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            throw e;
        } catch (KafkaException e) {
            this.errors.record();
            throwe; }}Copy the code

Analyze each snippet individually

Snippet 1. Get metadata

Before going any further, we need to know what metadata contains:

private final long refreshBackoffMs;
    // Maximum expiration time of metadata
    private final long metadataExpireMs;
    private int version;
    private long lastRefreshMs;
    private long lastSuccessfulRefreshMs;
    //cluser contains information about cluster nodes, all topics, and partitions
    private Cluster cluster;
    private boolean needUpdate;
    private final Set<String> topics;
    private final List<Listener> listeners;
    private boolean needMetadataForAllTopics;
Copy the code

We already updated the meta once while initializing producer

List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
            this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
Copy the code

Update is implemented as:

public synchronized void update(Cluster cluster, long now) {
        this.needUpdate = false;
        this.lastRefreshMs = now;
        this.lastSuccessfulRefreshMs = now;
        this.version += 1;

        for (Listener listener: listeners)
            listener.onMetadataUpdate(cluster);

        // Do this after notifying listeners as subscribed topics' list can be changed by listeners
        this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

        notifyAll();
        log.debug("Updated cluster metadata version {} to {}".this.version, this.cluster);
    }
Copy the code

The update method receives a cluster to initialize the cluster within the meta. However, at the beginning of the program, the cluster only contains the bootstrap information, and this meta is incomplete. Also note that the update method wakes up the thread waiting for the meta. (Notice the following method that calls wait()). So before you send a message, you need to get the metadata again.

Ensure that metadata is in place before sending a message.

private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already.
        if (!this.metadata.containsTopic(topic))
            // Put topic first in the meta set called Topics
            this.metadata.add(topic);

        // If the metadata already contains topic information, do not continue. Fetch () here returns the cluster of the meta
        if(metadata.fetch().partitionsForTopic(topic) ! =null)
            return 0;

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        // loop fetch
        while (metadata.fetch().partitionsForTopic(topic) == null) {
            log.trace("Requesting metadata update for topic {}.", topic);
            // Mark the status of the meta as needing to be updated
            int version = metadata.requestUpdate();
            // Wake up the sender
            sender.wakeup();
            // block on this object, waiting for notify in update
            metadata.awaitUpdate(version, remainingWaitMs);
            long elapsed = time.milliseconds() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            if (metadata.fetch().unauthorizedTopics().contains(topic))
                throw new TopicAuthorizationException(topic);
            remainingWaitMs = maxWaitMs - elapsed;
        }
        return time.milliseconds() - begin;
    }
Copy the code

All of this logic is pretty neat, and you end up with the sender step. When producer initializes, we see that Kafka starts a special thread to run the sender, whose run method is:

public void run(a) {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        while(! forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e); }}if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }
Copy the code

Run (time.milliseconds())

public void run(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); }}// create produce requests
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);

        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);
        List<ClientRequest> requests = createProduceRequests(batches, now);
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        for (ClientRequest request : requests)
            client.send(request, now);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);
    }
Copy the code

What happens is that the sender.wakeup call, it definitely updates the metadata, but how it does that, we’ll talk about later.

The snippet 2. Partition

Before sending a Kafka message, determine which partition of the topic it is sent to.

private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
        // If the message already has a partition, just check whether the partition is valid
        Integer partition = record.partition();
        if(partition ! =null) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
            int numPartitions = partitions.size();
            // they have given us a partition, use it
            if (partition < 0 || partition >= numPartitions)
                throw new IllegalArgumentException("Invalid partition given with record: " + partition
                                                   + " is not in the range [0..."
                                                   + numPartitions
                                                   + "].");
            return partition;
        }
        // Returns the result of the method implemented by the partition.
        return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
            cluster);
    }
Copy the code

Kafka has a default implementation for partitions:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // If the message does not contain a key, round robin is performed
        if (keyBytes == null) {
            int nextValue = counter.getAndIncrement();
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                returnDefaultPartitioner.toPositive(nextValue) % numPartitions; }}else {
            // Hash the partition
            // hash the keyBytes to choose a partition
            returnDefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }}Copy the code

Size of snippet 3&4 messages

The actual Kafka message is larger than the expected key value,

public static int recordSize(int keySize, int valueSize) {
        return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
    }
Copy the code

You need to limit the total size

private void ensureValidRecordSize(int size) {
        if (size > this.maxRequestSize)
            throw new RecordTooLargeException("The message is " + size +
                                              " bytes when serialized which is larger than the maximum request size you have configured with the " +
                                              ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
                                              " configuration.");
        if (size > this.totalMemorySize)
            throw new RecordTooLargeException("The message is " + size +
                                              " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                                              ProducerConfig.BUFFER_MEMORY_CONFIG +
                                              " configuration.");
    }
Copy the code

We’re not going to go into that.

Snippet 5. Put the message-first method into the buffer

What is a buffer? It contains these members

private volatile boolean closed;
    private int drainIndex;
    private final AtomicInteger flushesInProgress;
    private final AtomicInteger appendsInProgress;
    private final int batchSize;
    private final CompressionType compression;
    private final long lingerMs;
    private final long retryBackoffMs;
    private final BufferPool free;
    private final Time time;
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
    private final IncompleteRecordBatches incomplete;
Copy the code

Using the member variable of batches, we can see that Kafka piles messages of different TopicPartitions into different queues. A Dqueue is a two-ended queue that can be inserted and deleted at both ends. And as the name suggests, it goes into a buffer, which is this two-ended queue. The RecordBatch element in the Deque,

public final class RecordBatch {

    private static final Logger log = LoggerFactory.getLogger(RecordBatch.class);

    public int recordCount = 0;
    public int maxRecordSize = 0;
    public volatile int attempts = 0;
    public final long createdMs;
    public long drainedMs;
    public long lastAttemptMs;
    // A ByteBuffer is maintained
    public final MemoryRecords records;
    public final TopicPartition topicPartition;
    public final ProduceRequestResult produceFuture;
    public long lastAppendTime;
    private final List<Thunk> thunks;
    private booleanretry; .Copy the code

The high performance of Kafka can be seen in many implementation details, such as the implementation of the append method below. Let’s just look at the code.

public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        // It is possible that more than one thread is calling
        appendsInProgress.incrementAndGet();
        try {
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            // check if we have an in-progress batch
            Deque<RecordBatch> dq = dequeFor(tp);
            synchronized (dq) {
                RecordBatch last = dq.peekLast();
                // If the batch is good enough, it is added to the batch
                if(last ! =null) {
                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
                    if(future ! =null)
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); }}// we don't have an in-progress record batch try to allocate a new batch
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordBatch last = dq.peekLast();
                if(last ! =null) {
                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
                    if(future ! =null) {
                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                        free.deallocate(buffer);
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                    }
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); }}finally{ appendsInProgress.decrementAndGet(); }}Copy the code

The method of adding batch is as follows:

public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback, long now) {
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
            this.records.append(0L, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
            if(callback ! =null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            returnfuture; }}Copy the code

Before adding batch, determine whether the space is enough:

public boolean hasRoomFor(byte[] key, byte[] value) {
        if (!this.writable)
            return false;

        // If the current batch write volume is 0, check whether one batch is sufficient for storing this log. If not, return false. If the write amount is not 0, check whether the remaining space is sufficient.
        //initialCapacity is the size of the initial allocated bytebuffer, and writelimit is a batch size that we consider to be limited
        return this.compressor.numRecordsWritten() == 0 ?
            this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
            this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
    }
Copy the code

If the batch does not have enough space, null is returned, indicating that the batch fails to be stored. A return that is not empty is considered successful. A line of code is executed when the return is not empty

// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer);
Copy the code

In this already existing batch, it has silently allocated a piece of memory in advance, and now it is no longer used, so it is returned. The logic is a little bit easier to understand,

// Create a place to store the record
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
// Create batch again
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
Copy the code

We’ll talk more about how Kafka manages this chunk of memory later.

The snippet 6. Send

It’s finally time to send. We found this code again

sender.wakeup
Copy the code

What exactly does that mean? Let’s go to the last line of code in the run method of the sender thread

// if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);
Copy the code

Here the implementation class of client is NetworkClient,poll implementation is:

public List<ClientResponse> poll(long timeout, long now) {
        // This encapsulates a request to pull meta
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e); }}}return responses;
    }
Copy the code

MetadataUpdater. MaybeUpdate (now) encapsulated inside a pull request of meta.

public long maybeUpdate(long now) {
            // should we update our metadata?
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
            // if there is no node available to connect, back off refreshing metadata
            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                    waitForMetadataFetch);
 
            if (metadataTimeout == 0) {
                // Beware that the behavior of this method and the computation of timeouts for poll() are
                // highly dependent on the behavior of leastLoadedNode.
                // Select a node
                Node node = leastLoadedNode(now);
                maybeUpdate(now, node);
            }

            return metadataTimeout;
        }
Copy the code

LeastLoadedNode is used to find a node with the fewest requests at this time

public Node leastLoadedNode(long now) {
        List<Node> nodes = this.metadataUpdater.fetchNodes();
        int inflight = Integer.MAX_VALUE;
        Node found = null;

        int offset = this.randOffset.nextInt(nodes.size());
        for (int i = 0; i < nodes.size(); i++) {
            int idx = (offset + i) % nodes.size();
            Node node = nodes.get(idx);
            int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());
            if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) {
                // if we find an established connection with no in-flight requests we can stop right away
                return node;
            } else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) {
                // otherwise if this is the best we have found so far, record thatinflight = currInflight; found = node; }}return found;
    }
Copy the code

Where the implementation of maybeUpdate is

private void maybeUpdate(long now, Node node) {
            if (node == null) {
                log.debug("Give up sending metadata request since no node is available");
                // mark the timestamp for no node available to connect
                this.lastNoNodeAvailableMs = now;
                return;
            }
            String nodeConnectionId = node.idString();

            if (canSendRequest(nodeConnectionId)) {
                Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics();
                this.metadataFetchInProgress = true;
                ClientRequest metadataRequest = request(now, nodeConnectionId, topics);
                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
                doSend(metadataRequest, now);
            } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                // we don't have a connection to this node right now, make one
                log.debug("Initialize connection to node {} for sending metadata request", node.id());
                initiateConnect(node, now);
                // If initiateConnect failed immediately, this node will be put into blackout and we
                // should allow immediately retrying in case there is another candidate node. If it
                // is still connecting, the worst case is that we end up setting a longer timeout
                // on the next round and then wait for the response.
            } else { // connected, but can't send more OR connecting
                // In either case, we just need to wait for a network event to let us know the selected
                // connection might be usable again.
                this.lastNoNodeAvailableMs = now; }}Copy the code

Start by constructing a request

private ClientRequest request(long now, String node, Set<String> topics) {
            MetadataRequest metadata = new MetadataRequest(new ArrayList<>(topics));
            RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
            return new ClientRequest(now, true, send, null.true);
        }
Copy the code

So this metadata request depends on a topic, which we first passed in when producing. And finally, doSend

private void doSend(ClientRequest request, long now) {
        request.setSendTimeMs(now);
        // Stores requests that have not yet been responded to
        this.inFlightRequests.add(request);
        selector.send(request.request());
    }
Copy the code