By data analysis, data mining and machine learning and other aspects of the research, most of the time is mainly the use of consumer kafka inside the data, and then data analysis and detection engine research.

consumers

Properties props = new Properties();

 props.put(“bootstrap.servers”, “localhost:9092”); 

 props.put(“group.id”, “test”); 

props.put(“enable.auto.commit”, “true”);

 props.put(“auto.commit.interval.ms”, “1000”);

 props.put(“auto.offset.reset”, “earliest”); 

props.put(“session.timeout.ms”, “30000”); 

props.put(“fetch.min.bytes”, “1048576”);

 props.put(“fetch.max.wait.ms”, “2000”); 

props.put(“max.partition.fetch.bytes”, “2097152”);

 props.put(“max.poll.records”, “10000”); 

props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); 

props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); 

1. enable.auto.com MIT: The default value is true. In order to minimize duplicates and data loss, you can set it to false. You have your own control over the appropriate submission offset.

If a consumer exits the Group and reallocates a partition, how can another consumer in the same Group know which message the previous consumer should read from when reading the partition? How do you ensure that consumers in the same group do not repeat consumption messages?

After poll is used to pull data from the local cache, the client calls the commitSync method (or commitAsync method) to commit which offset message to read next. The commit method preserves the offset ina coordinator through a commit request across the network. This ensures that the next read, regardless of the rebalance, will not re-consume or miss a message

2.auto.offset.reset

The default value is Latest, which reads data from the latest record (the record generated after the consumer started), and the other value is EARLIEST, which means that the consumer reads data from the start if the offset is invalid.

3.session.timeout.ms

This property specifies how long the consumer can disconnect from the server before it is considered to have hung up. The default is 3s. If the consumer does not send the heartbeat to the server again within 3s, it will be considered dead.

This property is closely related to heartbeat.interval.ms, which defines the interval at which the consumer sends the heartbeat, also known as the heartbeat frequency. Both parameters must be changed at the same time. Generally 1/3 of session.timeout.ms.

4.max.partition.fetch.bytes

This property specifies the maximum number of bytes the server can return to the consumer from each partition. Its default value is 1M. Another factor to consider when setting this property is how long the consumer processes the data. Consumers need to call the poll() method frequently to avoid session expiration and partition rebalancing. If a single call to poll() returns too much data, consumers need more time to deal with it and may not be afraid to conduct the next poll in time to avoid session expiration. If appear this kind of circumstance, can put the Max. Partitioin.. Fetch bytes value changes little, or extended the session expiration time.

5. Fetch.min.bytes: The minimum number of bytes for the consumer to fetch a record from the server

6. The fetch. Max. Wait. Ms: broker waiting time

7.max.poll.records

Controlling the number of records that can be returned by a single call to the Call method helps control how much data needs to be processed in the polling.

Receive.buffer. bytes + sess.buffer. bytes The size of the TCP buffer used by the socket for reading or writing data can also be set. If they are set to -1, the operating system defaults are used. If the producer or consumer is in a different data center than the broker, you can increase these values appropriately because networks across data centers tend to have higher latency and lower bandwidth.

9. Partition. The assignment. The strategy: partition strategy

10.client.id

The identity of the Consumer process.

Producers producer

1. Set producer parameters acks

The number of replies to an produce request that the producer needs the leader to confirm before a message is considered “committed.” This parameter is used to control message persistence. Currently, three values are provided:

  • Acks = 0: indicates that the produce request is returned immediately without waiting for any confirmation from the leader. This scheme has the highest throughput, but there is no guarantee that the message will actually be sent.

  • Acks = -1: indicates that the partition leader considers the produce request to be successful only after the message is successfully written to all ISR copies (synchronous copies). This scenario provides the highest guarantee of message persistence, but theoretically the worst throughput rate.

  • Acks = 1: indicates that the leader copy must answer the produce request and write a message to the local log, after which the produce request is considered successful. If the leader replica hangs after answering the request at this point, the message is lost. This scheme provides good persistence and throughput.

2. Producer parameter buffer.memory setting (throughput)

The size of the buffer to cache messages, in bytes. The default value is 33554432. Kafka uses an asynchronous message architecture. Prducer starts up by creating a buffer in memory to hold messages to be sent, and then a dedicated thread reads messages from the buffer to actually send them.

  • As the message continues to send, when the buffer is filled, the producer immediately blocks until free memory is freed. The time limit cannot exceed the value set by max.blocks. Ms. Since Producer is thread-safe, we need to consider bumping buffer.memory if the Producer keeps sending timeoutExceptions.

  • When kafka Producer is shared with multiple threads, it is easy to fill up buffer.memory.

3. compression.type

Producer Compressors currently support None, Gzip, SNappy, and LZ4.

4.retries

Set the retry times of producer. When retrying, the producer resends the message that failed for transient reasons. Transient failures may be caused by metadata information failure, insufficient copies, timeout, out-of-bounds displacement, or unknown partitions. If retries > 0 is set, the producer tries again in those cases.

5. batch.size

Each producer sends data according to the batch, so the choice of batch size is critical to the performance of the producer. The producer encapsulates multiple messages sent to the same partition into a batch. When the batch is full, the producer sends the messages. But it doesn’t have to be full, which is related to another parameter lingering.ms. The default value is 16K, and the total value is 16384.

6.linger.ms

The producer sends packets according to batch, but the value of Linger. Ms is also determined. The default value is 0, indicating that the producer does not stay. In this case, some batches may not contain enough produce requests and are sent out, resulting in a large number of small batches, which brings great pressure to network I/O.

7.max.in.flight.requests.per.connection

The maximum number of unanswered produce requests that a producer I/O thread can send on a single Socket connection. Increasing this value should increase the throughput of the I/O threads, thereby improving producer performance overall. However, as mentioned earlier, setting this parameter to greater than 1 can cause messages to be out of order if retry is enabled.

Configuration file optimization

Server. The properties file

1. The number of threads

If the server CPU core is X, the number of threads required for compute-intensive tasks (num.net work.Threads) = X and the number of threads required for IO intensive tasks (num.io. Threads) = 2 * X

2. Message clear strategy

Log.cleanup. policy=delete# delete the message directly. You can configure the following two policies:

# time elapsed to clear:

 log.retention.hours=16 

Delete old messages when they exceed the specified size:

log.retention.bytes=1073741824

3. com pact strategy

It does not mean to compress log files through compression algorithms, but to clean up duplicate logs. During log clearing, duplicate keys are cleared. In the end, only the last key is retained. This is known as the PUT method of map. After cleaning up some segments, the file size becomes smaller. Kafka then merges those smaller segments into one larger segment. In addition, through the log cleaning function, we can delete a key function. Push a key with a value of null to Kafka. Kafka removes this key from the log when it cleans up logs.

4. Memory

Configure the kafka-server-start.sh file

The default value is 1 GB. The experience value in production ranges from 4 to 6 GB, and the maximum value is 6 GB. If the value exceeds 6 GB, the effect is not obvious.

5. Producer buffer optimization

Configure in producer.properties:

Buffer. memory:33554432 (32M) # The size of the buffer used by the Producer end to store messages that have not yet been sent. Buffers will block when they are full and max-block. ms (default: 1 hour) will throw an exception after thatCopy the code

6. Refresh the file

The server is configured in the properties

Every time producer writes 10,000 messages, data is flushed to disk

 log.flush.interval.messages=10000 

Brush data to disk every 1 second

log.flush.interval.ms=1000

7. Configure the message size

# producer. The properties:

# can also be set dynamically when creating a topic

Max. Request. Size = 5242880 (5 m)

#server.properties message.max.bytes=6291456 (6M)

# consumer. The properties:

The fetch. Max. 7340032 bytes = (7 m)