The specific dependency version is introduced according to the kafka version

My Version of Kafka is 2.7.1; To install a Kafka cluster, see juejin.cn/post/698619…

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>13 kafka_2.</artifactId>
    <version>2.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.1</version>
</dependency>
Copy the code

Write a Properties tool that reads kafka configuration information

public class PropUtils {
    /**
     * read properties in classpath
     *
     * @param filename file name
     * @return properties
     * @throws IOException e
     */
    public static Properties load(String filename) throws IOException {
        InputStream inputStream = PropUtils.class.getClassLoader().getResourceAsStream(filename);
        Properties prop = new Properties();
        prop.load(inputStream);
        returnprop; }}Copy the code

Create a Producer

  1. Configure the Kafka-producer configuration file

Resource Create a kafka-producer.properties configuration file in the resource directory

bootstrap.servers=localhost:9090,localhost:9091,localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
Copy the code
  1. configurationProducerobject
@Configuration
public class KafkaBeanConfiguration {

    @Bean
    public KafkaProducer<String, Object> kafkaProducer(a) throws IOException {
        return new KafkaProducer<>(PropUtils.load("kafka-producer.properties")); }}Copy the code
  1. Send message use case

Send a bar message to foo Topic

@Resource
private KafkaProducer<String, Object> kafkaProducer;

@Test
public void testProducer(a) {
    kafkaProducer.send(new ProducerRecord<>("foo"."bar"));
}
Copy the code

To create a Consumer

  1. Configure the kafka-consumer profile

Resource Create the kafka-consumer.properties configuration file in the resource directory

bootstrap.servers=localhost:9090,localhost:9091,localhost:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=test-group
Copy the code
  1. Configure the kafkaConsumer object
@Bean
public KafkaConsumer<String, Object> kafkaConsumer(a) throws IOException {
    return new KafkaConsumer<>(PropUtils.load("kafka-consumer.properties"));
}
Copy the code
  1. Listen for messages, process messages
@Slf4j
@Component
public class KafkaMessageDispatcher {

    @Resource
    private KafkaConsumer<String, Object> kafkaConsumer;

    @PostConstruct
    public void dispatcher(a) {
        kafkaConsumer.subscribe(Collections.singleton("foo"));
        try {
            new Thread(() -> {
                while (true) {
                    ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(100));
                    handlerRecord(records);
                }
            }).start();
        } catch(Exception e) { errHandler(e); }}private void handlerRecord(ConsumerRecords<String, Object> records) {
        records.forEach(r -> {
            log.info("handler record:topic[{}],offset[{}],partition[{}],key[{}],val[{}]",
                    r.topic(), r.offset(), r.partition(), r.key(), r.value());
        });
    }

    private void errHandler(Exception e) {
        // todolog.info(e.getMessage()); }}Copy the code