Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star

The book follows the Kafka source code analysis 1- environment preparation, this article focuses on analyzing the Producer initialization process.

1. The Producer is used

Here’s a simple use of producer:

public class ProducerTest { private static String topicName; private static int msgNum; private static int key; public static void main(String[] args) { Properties props = new Properties(); Props. The put (" the bootstrap. The servers, "" 127.100.0.1:9092127100 0.2:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); topicName = "test"; msgNum = 10; // Number of messages sent // Producer<String, String> Producer = new KafkaProducer<>(props); for (int i = 0; i < msgNum; i++) { String msg = i + " This is matt's blog."; producer.send(new ProducerRecord<String, String>(topicName, msg)); } producer.close(); }}Copy the code

As you can see from the code above, Kafka provides users with a very simple API. When using Kafka, there are only two steps:

  1. Initialize theKafkaProducerInstance;
  2. callsendThe interface sends data.

2. Producer properties

Let’s take a look at the properties of KafkaProducer

public class KafkaProducer<K, V> implements Producer<K, V> { private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); Private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); Private static final String JMX_PREFIX = "kafka.producer"; // Producer client name private String clientId; // Private final Partitioner Partitioner; Private final int maxRequestSize; private final int maxRequestSize; Private final Long totalMemorySize; // Cluster Metadata private final Metadata Metadata; Private Final RecordAccumulator Accumulator; Private final Sender Sender; // Performance monitoring related private final Metrics Metrics; // The sender Thread in which the sender object runs the private final Thread ioThread; // Private final CompressionType CompressionType; Private final Sensor errors; Private final Time Time; // Private final Serializer<K> keySerializer; private final Serializer<V> valueSerializer; // Private final ProducerConfig ProducerConfig; Private final Long maxBlockTimeMs; private final Long maxBlockTimeMs; Private final int requestTimeoutMs; private final int requestTimeoutMs; Private final ProducerInterceptors<K, V> interceptors; }Copy the code

Relevant notes have been noted and will not be repeated here.

3. The Producer initialization

KafkaProducer constructor

public KafkaProducer(Properties properties) {
      this(new ProducerConfig(properties), null, null);
}

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    try {
        log.trace("Starting the Kafka producer");
        // 从原始配置拷贝一份副本
        Map<String, Object> userProvidedConfigs = config.originals();
        this.producerConfig = config;
        this.time = Time.SYSTEM;
        // 获取clientId
        clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
        if (clientId.length() <= 0)
            // 如果clientId没指定,则使用"producer-序列化"的形式表示
            clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
        Map<String, String> metricTags = new LinkedHashMap<String, String>();
        metricTags.put("client-id", clientId);
        // metric一些东西,监控
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                .tags(metricTags);
        List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                MetricsReporter.class);
        reporters.add(new JmxReporter(JMX_PREFIX));
        this.metrics = new Metrics(metricConfig, reporters, time);
        // 获取配置的分区器
        this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
        // 重试时间 默认100ms
        long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
        // 设置序列化器
        if (keySerializer == null) {
            this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    Serializer.class);
            this.keySerializer.configure(config.originals(), true);
        } else {
            config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
            this.keySerializer = keySerializer;
        }
        if (valueSerializer == null) {
            this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    Serializer.class);
            this.valueSerializer.configure(config.originals(), false);
        } else {
            config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
            this.valueSerializer = valueSerializer;
        }

        // load interceptors and make sure they get clientId
        userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
        // 设置拦截器
        List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ProducerInterceptor.class);
        this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

        ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);

        /** 创建Metadata集群元数据对象,生产者从服务端拉取kafka元数据信息
         *  需要发送网络请求,重试
         *  metadata.max.age.ms 生产者每隔一段时间更新自己的元数据,默认5分钟
          */
        this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
        /**
         * MAX_REQUEST_SIZE_CONFIG 生产者往服务端发送一条消息最大的size
         * 默认1m 如果超过这个大小,消息就发送不出去
         */
        this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
        /**
         * buffer.memory 缓存大小,默认32M
         */
        this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
        /**
         * kafka可以压缩数据,设置压缩格式
         * 提高系统的吞吐率
         * 一次发送出去的消息越多,生产者需要消耗更多的cpu
         */
        this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
       
         ....(省略部分代码)
      
        /**
         * 创建RecordAccumulator,它是一个发送消息数据的记录缓冲器,用于批量发送消息数据
         * batch.size单位是字节,默认16k 用于指定达到多少字节批量发送一次
         */
        this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                this.totalMemorySize,
                this.compressionType,
                config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                retryBackoffMs,
                metrics,
                time);

        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
        /**
         * 会将创建KafkaProducer时配置的broker列表传入
         * 该update()方法同时会更新Metadata对象的一些属性,
         * 并通知所有MetadataUpdate Listener监听器,自己要开始更新数据了
         * 同时唤醒等待Metadata更新完成的线程
         */
        //TODO product初始化时, update方法初始化的时候并没有去服务端拉取元数据。
        this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
        // 根据配置的协议,创建不同的ChannelBuilder
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
        //TODO 初始化了一个重要的管理网路的组件。
        /**
         *connections.max.idle.ms: 默认值是9分钟,一个网络连接最多空闲多久,超过这个空闲时间,就关闭这个网络连接。
         *
         * max.in.flight.requests.per.connection:默认是5,producer -> broker 。
         *  发送数据的时候,其实是有多个网络连接。每个网络连接可以忍受 producer端发送给broker消息 然后消息没有响应的个数。
         *  因为kafka有重试机制,所以有可能会造成数据乱序,如果想要保证有序,这个值要把设置为1.
         *
         *  send.buffer.bytes:socket发送数据的缓冲区的大小,默认值是128K
         *  receive.buffer.bytes:socket接受数据的缓冲区的大小,默认值是32K。
         */
        NetworkClient client = new NetworkClient(
                new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
                this.metadata,
                clientId,
                config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                this.requestTimeoutMs,
                time,
                true);
        // 创建sender线程
        //retries:重试的次数
        /***
         * acks:
         *   0:
         *      producer发送数据到broker后,就完了,没有返回值,不管写成功还是写失败都不管了。
         *   1:
         *      producer发送数据到broker后,数据成功写入leader partition以后返回响应。
         *      数据 -> broker(leader partition)
         *   -1:
         *       producer发送数据到broker后,数据要写入到leader partition里面,并且数据同步到所有的
         *       follower partition里面以后,才返回响应。
         *
         *       这样我们才能保证不丢数据。
         */
        this.sender = new Sender(client,
                this.metadata,
                this.accumulator,
                config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                config.getInt(ProducerConfig.RETRIES_CONFIG),
                this.metrics,
                Time.SYSTEM,
                this.requestTimeoutMs);
        String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
        /**
         * 创建了一个线程,然后里面传进去了一个sender对象。
         * 把业务的代码和关于线程的代码给隔离开来。
         * 关于线程的这种代码设计的方式,其实也值得大家积累的。
        */
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        // 启动sender线程
        this.ioThread.start();

        this.errors = this.metrics.sensor("errors");

        config.logUnused();
        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
        log.debug("Kafka producer started");
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed
        // this is to prevent resource leak. see KAFKA-2121
        close(0, TimeUnit.MILLISECONDS, true);
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka producer", t);
    }
}
Copy the code

KafkaProducer initializes important components:

(1) Core component: The Partitioner is used to determine which partition of the Topic each message you send is routed to

(2) Core components: Metadata, which is a core component for the production side, is used to pull Metadata from the broker cluster to Topics (Topic -> Leader+Followers, ISR). We know which Partitions the Topic has and which Broker the Partition Leader belongs to

The default is 5 minutes, and the default must be every 5 minutes to force the refresh

Still have even if we assume that, at the time of sending a message, if you find a Topic to write the corresponding metadata is not local, so he is certainly by the component, send a request to the broker corresponding metadata, trying to pull the Topic if you added a broker in a cluster, will also involve changes in the metadata

(3) Core parameters: maximum size of each request (1MB), buffer memory size (32MB), retry interval (100ms), block time after buffer filling (60s), request timeout time (30s)

(4) Core component: RecordAccumulator, which is responsible for the complex buffer mechanism of messages. The messages sent to each partition will be packaged into batch, and the multiple batches corresponding to multiple partitions on a broker will be packaged into a request, batch size (16KB).

By default, if batch is considered, you must wait until enough messages are packaged into a batch before sending them to the broker via request. But there is a problem if you send a message and wait a long time before it reaches a batch size

Therefore, a batch message must be set. If no batch message is collected within the specified time range, the message must be sent immediately at the time specified by linger. Ms, for example, 5ms

(5) Core behavior: during initialization, the Metadata component is directly invoked to pull the Metadata of a cluster from the broker. After that, the cluster Metadata will be refreshed every 5 minutes by default. But when sending messages, if the Metadata of a Topic is not found, it will take the initiative to pull the Metadata once

(6) Core components: The components of network communication, NetworkClient, how long a network connection can be idle (9 minutes), how many requests each connection can receive no response (5), the interval between retries (50ms), the size of the Socket send buffer (128KB), Socket receive buffer size (32KB)

(7) Core components: Sender thread, which is responsible for retrieving messages from the buffer and sending them to the broker, request maximum size (1MB), acks (1), number of retries (0, no retries), request timeout (30s), thread class called “KafkaThread”. The thread name is kafka-producer-network-thread. The thread is started directly

(8) Core components: serialization components, interceptor components

Now, let’s focus on how the props property is converted to a class object, the ProducerConfig object,

The ProducerConfig object uses the AbstractConfig constructor:

public AbstractConfig(ConfigDef definition, Map<? ,? > originals, boolean doLog) { /* check that all the keys are really strings */ for (Object key : originals.keySet()) if (! (key instanceof String)) throw new ConfigException(key.toString(), originals.get(key), "Key must be a string."); this.originals = (Map<String, ? >) originals; This. values = definition. Parse (this.originals); this.used = Collections.synchronizedSet(new HashSet<String>()); if (doLog) logAll(); }Copy the code

The call chain for this method is as follows:

parse(Map
props) -> parseType(String name, Object value, Type type)

public static Object parseType(String name, Object value, Type type) { try { if (value == null) return null; String trimmed = null; if (value instanceof String) trimmed = ((String) value).trim(); switch (type) { case BOOLEAN: if (value instanceof String) { if (trimmed.equalsIgnoreCase("true")) return true; else if (trimmed.equalsIgnoreCase("false")) return false; else throw new ConfigException(name, value, "Expected value to be either true or false"); } else if (value instanceof Boolean) return value; else throw new ConfigException(name, value, "Expected value to be either true or false"); case PASSWORD: if (value instanceof Password) return value; else if (value instanceof String) return new Password(trimmed); else throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName()); case STRING: if (value instanceof String) return trimmed; else throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName()); case INT: if (value instanceof Integer) { return (Integer) value; } else if (value instanceof String) { return Integer.parseInt(trimmed); } else { throw new ConfigException(name, value, "Expected value to be a 32-bit integer, but it was a " + value.getClass().getName()); } case SHORT: if (value instanceof Short) { return (Short) value; } else if (value instanceof String) { return Short.parseShort(trimmed); } else { throw new ConfigException(name, value, "Expected value to be a 16-bit integer (short), but it was a " + value.getClass().getName()); } case LONG: if (value instanceof Integer) return ((Integer) value).longValue(); if (value instanceof Long) return (Long) value; else if (value instanceof String) return Long.parseLong(trimmed); else throw new ConfigException(name, value, "Expected value to be a 64-bit integer (long), but it was a " + value.getClass().getName()); case DOUBLE: if (value instanceof Number) return ((Number) value).doubleValue(); else if (value instanceof String) return Double.parseDouble(trimmed); else throw new ConfigException(name, value, "Expected value to be a double, but it was a " + value.getClass().getName()); case LIST: if (value instanceof List) return (List<? >) value; else if (value instanceof String) if (trimmed.isEmpty()) return Collections.emptyList(); else return Arrays.asList(trimmed.split("\\s*,\\s*", -1)); else throw new ConfigException(name, value, "Expected a comma separated list."); case CLASS: if (value instanceof Class) return (Class<? >) value; else if (value instanceof String) return Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader()); else throw new ConfigException(name, value, "Expected a Class instance or class name."); default: throw new IllegalStateException("Unknown type."); } } catch (NumberFormatException e) { throw new ConfigException(name, value, "Not a number of type " + type); } catch (ClassNotFoundException e) { throw new ConfigException(name, value, "Class " + value + " could not be found."); }}Copy the code

In the parseType method, we can parse the parameter props passed in by the user into the corresponding class.

4. The last

KafkaProducer initialization: KafkaProducer initialization: KafkaProducer initialization: KafkaProducer initialization: KafkaProducer The next part will analyze the main process of Producer.

Reference Documents:

Kafka 0.10.2.0-src kafka 0.10.2.0-src kafka 0.10.2.0-src

Kafka technology insider – Graphic details kafka source code design and implementation

Kafka source code analysis series