SparkStreaming pulls data from Kafka

The complete code

package com.chb.spark.streaming; import java.io.Serializable; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; public class KafkaDirectWordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("kafka-wordCount").setMaster("local[2]"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(3)); KafkaParams = new HashMap<String, String>(); List kafkaparams. put("metadata.broker.list", "192.168.1.22:9092, 192.168.1.225:9092, 192.168.1.26:9092 "); Topic Set<String> topics = new HashSet<String>(); topics.add("wordcount"); JavaPairInputDStream < String, the String > lines. = KafkaUtils createDirectStream (JSSC, String class, / / the type of key String. The class, Stringdecoder. class, kafkaParams, topics); /** * lines = Tuple2<String, String> Value is line, flatMap purpose Cut line into word * val words = line.flatmap (_._2.split(" ")); */ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String,String>, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(Tuple2<String, String> t) throws Exception { return Arrays.asList(t._2.split(" ")); }}); Each word map / * * * * word = > (1) word, * val pairs words. The map (x = > (x, 1)); */ JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String t) throws Exception { return new Tuple2<String, Integer>(t, 1); }}); /** * count the number of pairs * val WCS = mother.reduceByKey (_+_) * public interface Function2<T1, T2, R> extends Serializable { * R call(T1 v1, T2 v2) throws Exception; * } */ JavaPairDStream<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; }}); Wcs.print (); wcs.print(); jssc.start(); jssc.awaitTermination(); jssc.stop(); jssc.close(); }}Copy the code

Scala code

package com.spark.stream.scala import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Durations import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import kafka.serializer.StringDecoder object WordCountKafka { def main(args:  Array[String]): Unit = {val sparkConf = new sparkConf ().setAppName("WordCount").setmaster ("local[2]") Val sc = new StreamingContext(sparkConf, durations.seconds (10)); List val kafkaParams = map [String, kafkaParams = map [String, kafkaParams = map] String]( "metadata.broker.list" -> "node1:9092,node2:9092,node3:9092" ) // Var topics = set [String](" wordCount20160423 ") var topics = set [String](" wordCount20160423 ") // Kafka returns data in key/value format, Just ok to split the value behind the val linerdd = KafkaUtils. CreateDirectStream [String, String, StringDecoder, StringDecoder] (sc, kafkaParams, topics) val wordrdd = linerdd.flatMap { _._2.split(" ") } val resultrdd = wordrdd.map { x => (x, 1) }.reduceByKey { _ + _ } // resultrdd.map(x => println(x._1+"--"+x._2)) resultrdd.print() sc.start() sc.awaitTermination() sc.stop() } }Copy the code