1, the introduction of

Kafka is an open source stream processing platform developed by the Apache Software Foundation and written in Scala and Java. Kafka is a high-throughput distributed publish-subscribe messaging system that processes all of the consumer’s action-flow data in a website. This action (web browsing, searching and other user actions) is a key factor in many social functions on the modern web. This data is usually addressed by processing logs and log aggregation due to throughput requirements. This is a viable solution for logging data and offline analysis systems like Hadoop that require real-time processing limitations. Kafka is designed to unify online and offline message processing through Hadoop’s parallel loading mechanism, and to provide real-time messaging across clusters.

Docker-compose deploys ZooKeeper and Kafka

docker-compose.yml

version: '2' 
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    volumes:
        - /etc/localtime:/etc/localtime
    ports:
      - "9092:9092"Environment: KAFKA_ADVERTISED_HOST_NAME: 192.0.0.171 KAFKA_ZOOKEEPER_CONNECT: 192.0.0.171:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT: / / 192.0.0.171:9092 KAFKA_LISTENERS: PLAINTEXT: / / 0.0.0.0:9092 KAFKA_HEAP_OPTS:"-Xmx256M -Xms128M"Kafka-manager: image: sheepkiller/kafka-manager environment: ZK_HOSTS: 192.0.0.171:2181 Ports: -"9000:9000"

Copy the code

To run docker-compose, you need to pull the image, which may take a while

docker-compose up -d
Copy the code

Add dependencies

<! --kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>Copy the code

Application. Yml add kafka configuration

Spring: kafka: the bootstrap - the servers: 192.0.0.171:9092#Kafka server address
    listener:
      concurrency: 10
      ack-mode: MANUAL_IMMEDIATE
      poll-timeout: 1500
      missing-topics-fatal: falseproducer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: Org.apache.kafka.com mon. Serialization. StringDeserializer bootstrap - the servers: 192.0.0.171:9092 group - id: 0Copy the code

5. Message producers

@Component
public class KafkaProducer {
    private static Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendLog(String log){
        logger.info("Send a message to Kafka :"+log);
        kafkaTemplate.send("topic_log".log); }}Copy the code

5. Message consumers

@Component
public class KafkaConsumer {
	private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

	@KafkaListener(topics = {"topic_log"}) public void listen (ConsumerRecord<? ,? > record, Acknowledgment acknowledgment, Consumer<? ,? > consumer){ try { StringlogStr = (String) record.value();
			acknowledgment.acknowledge();
			logger.info("Kafka receives messages: {}".logStr);
		} catch (Exception e) {
			e.printStackTrace();
			logger.error(Kafka message consumption failed:, e); }}}Copy the code

6. Test method

@Controller
@RequestMapping("/kafka")
public class KafkaTest {
    @Autowired
    KafkaProducer kafkaProducer;


    @ApiOperation(value = "Kafka test message sending",notes = "Kafka test message sending",httpMethod = "GET")
    @ApiImplicitParam(name = "message",value = "News",dataType="String",required = true)
    @RequestMapping(value="/send") @ResponseBody public void send(String message){ kafkaProducer.sendLog(message); }}Copy the code

7, test,

Start the project, swagger the test, no problem