Kafka0.9 has rewritten producer in Java, abolishing the original scala version.

The latest version 2.3 is used directly here, and all versions after 0.9 are applicable.

Pay attention to the package for reference: org. Apache. Kafka. Clients. Producer

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo {

	public static void main(String[] args) {

		Properties properties = new Properties();
		properties.put("bootstrap.servers"."kafka01:9092,kafka02:9092");
		properties.put("acks"."all");
		properties.put("retries", 0);
		properties.put("batch.size", 16384);
		properties.put("linger.ms", 1);
		properties.put("buffer.memory", 33554432);
		properties.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
		kafkaProducer.send(new ProducerRecord<>("topic"."value")); kafkaProducer.close(); }}Copy the code

After 0.11.0, transactions are added. The sample code for transaction producer is as follows, which should be applicable to versions after 0.11.0:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class TransactionsProducerDemo {

	public static void main(String[] args) {

		Properties props = new Properties();
		props.put("bootstrap.servers"."localhost:9092");
		props.put("transactional.id"."my-transactional-id");
		Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

		producer.initTransactions();

		try {
			producer.beginTransaction();
			for (int i = 0; i < 100; i++)
				producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
			producer.commitTransaction();
		} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
			// We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close(); }}Copy the code

For more blog posts on real-time computing, Kafka and other related technologies, welcome to real-time streaming computing