Each “like” on the page sends the id of the “like” article to Kafka. Then spark Streaming reads the data in Kafka, counts the “like” number, and updates it back to mysql

The full case code has been uploaded to github: github.com/neatlife/my…

Get case items

Projects can be created at https://start.spring.io

Add web-starter and kafka-starter. Spark and Spark kafka streaming need to be added manually. No corresponding starter is available

Then the clients for Kafka and Spark Streaming are introduced in POM.xml

<! -- Spark dependencies -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>Kafka_2. 10</artifactId>
    <version>0.8.2.1</version>
</dependency>
<! -- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>The spark - core_2. 10</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>The spark - streaming_2. 10</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>The spark - streaming - kafka_2. 10</artifactId>
    <version>1.6.2</version>
</dependency>
Copy the code

Send a “like” postId to Kafka

Kafka: juejin.cn/post/684490…

Create kafka topics, such as myLikes

kafka-topics --create --topic mylikes --replication-factor 1 --partitions 1 --zookeeper zoo1:2181
Copy the code

The operation effect is as follows

Add a like interface, the core code is as follows

@RequestMapping("/send-like")
public String sendLike(@RequestParam(value = "post_id", required = true) Integer postId) {
    producer.send(postId);
    return "test1";
}
Copy the code

Kafka sends the core code below

public void send(Integer postId) {
        ProducerRecord producerRecord = new ProducerRecord(topic, postId.toString(), "1");
        this.kafkaTemplate.send(producerRecord);
        System.out.println("Sent sample postId [" + postId + "] to " + topic);
}
Copy the code

Note that the key and value sent to Kafka are strings. The ID and the number of likes are int, so spark needs to do this conversion

Read data from Kafka in Spark and count upvotes

Create a Spark client that reads data from Kafka

SparkConf conf = new SparkConf()
        .setAppName("mySparkLikes")
        .setMaster("local[*]")
        .set("spark.default.parallelism"."15")
        .set("spark.streaming.concurrentJobs"."5")
        .set("spark.executor.memory"."1G")
        .set("spark.cores.max"."3")
        .set("spark.local.dir"."/tmp/mySparkLikes")
        .set("spark.streaming.kafka.maxRatePerPartition"."5");

Set<String> topics = Collections.singleton(topic);

Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list"."127.0.0.1:9092");

JavaStreamingContext jsc = new JavaStreamingContext(
        new JavaSparkContext(conf),
        Durations.seconds(3));
jsc.checkpoint("checkpoint");
Copy the code

Create a Kafka data stream

// Get the data stream
final JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
        jsc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topics
);
System.out.println("stream started!");
stream.print();
Copy the code

Stream.print () triggers reading of data

Convert postId and likes in Kafka to integers

JavaPairDStream<Integer, Integer> countDStream = stream
        .transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<Integer, Integer>>() {

            @Override
            public JavaPairRDD<Integer, Integer> call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception {
                return stringStringJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, String>, Integer, Integer>() {

                    @Override
                    public Tuple2<Integer, Integer> call(Tuple2<String, String> stringStringTuple2) throws Exception {
                        return new Tuple2<>(new Integer(stringStringTuple2._1), newInteger(stringStringTuple2._2)); }}); } }) .reduceByKey(Integer::sum);Copy the code

SQL statement that generates the number of likes

countDStream.foreachRDD(v -> {
    v.foreach(record -> {
        String sql = String.format("UPDATE `post` SET likes = likes + %s WHERE id=%d", record._2, record._1);
        System.out.println(sql);
    });
    log.info("Batch data stream processed: {}", v);
});
Copy the code

Start flow calculation

jsc.start();
Copy the code

Add an interface to invoke the above code

@RequestMapping("/launch")
public String launch(a) {
    sparkLikeService.launch();
    return "test2";
}
Copy the code

To start the stream computing engine, visit /launch, and then visit the send-like interface to generate the likes data. Check the SQL statement generated by the console

You can see the SQL that has received the likes. You can use JPA to store the likes in the database

Local debugging

The Spark job can be debugged locally, but the following conditions must be met

  1. Spark needs to be started locally
  2. The address for connecting the job to the master must be local[*].

The resources

  1. www.iteblog.com/archives/13…
  2. Github.com/eBay/Spark/…
  3. Blog.csdn.net/guotong1988…
  4. www.4spaces.org/spark-map-f…
  5. Blog.csdn.net/wuxintdrh/a…