This post was posted by GodPan on the ScalaCool team blog.

After reading the last article, I believe you have a preliminary understanding of the message system and Kafka’s overall composition, the best way to learn something is to use it, today let us take a look at Kafka, and complete their debut.

The history of messages in Kafka

Although we need to master things step by step, after we have a general understanding of something, it will be beneficial for us to understand and learn it. Therefore, we can first look at what a message goes through from sending to receiving at last.

The diagram above briefly illustrates the entire flow of messages through Kafka (assuming the entire Kafka system has been deployed and the corresponding Topic has been created, partitioning and so on will be covered separately later) :

  • 1. Message producers publish messages to specific topics and distribute them to specific partitions according to certain algorithms or randomly;
  • 2. Whether the message processing logic needs to be implemented based on actual requirements;
  • 3. Implement specific logic and publish the results to output Topic if necessary;
  • 4. Consumers subscribe to relevant topics and consume messages according to their needs;

In general, how the process is still relatively clear and simple, the following with me to practice Kafka basic operation, finally achieve a word count small demo.

Basic operation

The following code and corresponding tests have been tested in the following environment: Mac OS + JDK1.8, Linux should also be able to run, Windows interested students can go to the official website to download the corresponding version of the test exercise.

Download the Kafka

For Mac, brew can be used to install:

brew install kafka
Copy the code

Linux students can download the source code from the official website to decompress, or directly run the following command:

Mkdir test-kafka && CD test-kafka curl -o kafka_2.11-1.0.1. TGZ http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgz tar - XZF kafka_2. 11-1.0.1. TGZ CD Kafka_2. 11-1.0.1Copy the code

Start the

Kafka uses Zookeeper to maintain cluster information, so we need to start Zookeeper first. Kafka and Zookeeper are related to the connection and combination of follow-up in-depth understanding, after all, can not eat a big man.

bin/zookeeper-server-start.sh config/zookeeper.properties
Copy the code

Then we start a Kafka Server node:

bin/kafka-server-start.sh config/server.properties
Copy the code

The Kafka system is already up and running.

Create a Topic

After everything is ready, we will start to do the extremely important step, that is to create Topic, which is the core of the whole system flow. In addition, Topic itself also contains many complex parameters, such as the number of replicators, the number of partitions, etc. In order to simplify, we set the corresponding parameters to 1 for the convenience of testing:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kakfa-test
Copy the code

The specific meanings of parameters are as follows:

attribute function
–create Is for creating a Topic
–zookeeper Zookeeper Cluster information
–replication-factor replicator
–partitions Partition information
–topic The name of the Topic

At this point we have created a Topic called Kakfa-test.

Send messages to topics

After we have a Topic we can send a message to it:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kakfa-test
Copy the code

We then enter some messages to the console:

this is my first test kafka
so good
Copy the code

At this point, the message has been posted on the kakfa-test theme.

Get messages from Topic

Now that there are messages on the Topic, it is now possible to get messages from it to be consumed:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-test --from-beginning
Copy the code

At this point we can see on the console:

this is my first test kafka
so good
Copy the code

So far we have tested the simplest Kafka Demo. I hope you can try it yourself. It is simple, but it will make you more familiar with the whole Kafka process.

WordCount

Let’s use some of the basic actions above to implement a simple WordCount application that does the following:

  • 1. Support phrase continuous input, that is, the producer continuously generates messages;
  • 2. The program automatically obtains the original data from the input Topic, and then processes the data and publishes the processing results in the counting Topic;
  • 3. Consumers can get the result of the corresponding WordCount from the count Topic;

1. Start the kafka

As with the above startup, follow its operation.

2. Create input topics

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka-word-count-input --partitions 1 --replication-factor 1
Copy the code

3. Input messages to Topic

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-word-count-input
Copy the code

4. Flow processing logic

This part is the core of the whole example, this part of the code has Java 8+ and Scala versions, I think the flow processing using functional syntax is more concise and clear, I recommend you to use functional thinking to try to write the following, found that I no longer want to write Java anonymous inner class syntax.

Let’s start with a Java 8 version:

public class WordCount {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.<String, String>stream("kafka-word-count-input");
        Pattern pattern = Pattern.compile("\\W+");
        source
           .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase(Locale.getDefault()))))
           .groupBy((key, value) -> value)
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")).mapValues(value->Long.toString(value))
           .toStream()
           .to("kafka-word-count-output");
        final KafkaStreams streams = newKafkaStreams(builder.build(), props); streams.start(); }}Copy the code

Isn’t it surprising that you can write such simple code in Java, so I recommend that you try to write Java code in a functional way if there are scenarios that apply.

Let’s look at the Scala version again:


object WordCount {
  def main(args: Array[String]) {
    val props: Properties = {
      val p = new Properties()
      p.put(StreamsConfig.APPLICATION_ID_CONFIG."kafka-word-count")
      p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG."localhost:9092")
      p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG.Serdes.String.getClass)
      p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG.Serdes.String.getClass)
      p
    }

    val builder: StreamsBuilder = new StreamsBuilder(a)val source: KStream[String.String] = builder.stream("kafka-word-count-input")
    source
      .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
      .groupBy((_, word) => word)
      .count(Materialized.as[String.Long.KeyValueStore[Bytes.Array[Byte]]] ("counts-store")).toStream.to("kafka-word-count-output")
    val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
    streams.start()
  }
}
Copy the code

You can see that code written in the Java 8 functional style is already very similar to Scala.

5. Start the processing logic

Many students do not have SBT installed on their computers, so the Java version built using Maven is shown here. For details, please refer to the instructions in kafka-word-count.

6. Start the consumer process

Finally we start the consumer process and type some words into the producer, such as:

Finally we can see the following output in the consumer process:

bin/kafka-console-consumer.sh --topic kafka-word-count-output --from-beginning --bootstrap-server localhost:9092  --property print.key=true
Copy the code

conclusion

This article mainly explain the basic operation process of Kafka and some basic operations, but this is something we learn a indispensable step, only the good foundation, in order to further to understand it, understand it why so design, I am in the process also encountered a lot of trouble, so I hope you can to practice myself, You end up getting more out of it.