This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from the real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Copyright notice: No reprint, welcome to learn. QQ email address: [email protected], if there is any business exchange, can contact at any time.

1 Come straight to the point

1.1 Optimization 1: HA processing – For stateful operations such as updateStateByKey and Window

  • HA High Availability: High Availability in case of data loss or node failure; If you don’t want your real-time computing program to die, you have to make some redundant copies of your data to keep your real-time computing program running 24/7.

  • The checkpoint directory must be set when stateful operations such as updateStateByKey and Window are automatically checked.

    HDFS: SparkStreaming checkpoint (" HDFS: / / 192.168.1.105:9090 / checkpoint "),Copy the code
  • Checkpoint stores data in a fault-tolerant file system in case data is lost in memory. The data can be read directly from the document system without recalculation.

    JavaStreamingContext jssc = new JavaStreamingContext( conf, Durations.seconds(5)); JSSC. Checkpoint (" HDFS: / / 192.168.1.105:9000 / streaming_checkpoint ");Copy the code

1.2 Optimization 2: HA for Driver high availability

  • When StreamingContext is created and started, metadata is written to a fault-tolerant file system (such as HDFS). Ensure that the Spark cluster can restart the driver after the driver fails. When the driver starts, it does not create a streaming context but reads the previous metadata information, including the job execution progress, from a fault-tolerant file system (such as HDFS).

  • To use this mechanism, you must commit in cluster mode to ensure that the driver is running on a worker.

    JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { JavaStreamingContext jssc = new JavaStreamingContext(...) ; JavaDStream<String> lines = jssc.socketTextStream(...) ; jssc.checkpoint(checkpointDirectory); return jssc; }}; JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory); context.start(); context.awaitTermination();Copy the code
  • JavaStreamingContext. GetOrCreate based on Function0 < JavaStreamingContext > Driver high availability

    Function0<JavaStreamingContext> createContextFunc = new Function0<JavaStreamingContext>(){ @Override public JavaStreamingContext call() throws Exception { conf = new SparkConf() .setMaster("local[4]") .setAppName("java/RealTimeStreaming") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.default.parallelism", "10") .set("spark.streaming.blockInterval", "50") .set("spark.streaming.receiver.writeAheadLog.enable", "true"); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "Master:9092,Worker1:9092,Worker2:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "TestGroup"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit",true); JavaStreamingContext jssc = new JavaStreamingContext( conf, Durations.seconds(30)); jssc.checkpoint("hdfs://Master:9000/checkpoint"); / / build topic set String kafkaTopics = ConfigurationManager. GetProperty (the KAFKA_TOPICS); String[] kafkaTopicsSplited = kafkaTopics.split(","); Set<String> topics = new HashSet<String>(); for(String kafkaTopic : kafkaTopicsSplited) { topics.add(kafkaTopic); } JavaInputDStream<ConsumerRecord<String, String>> adRealTimeLogDStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams)); hostMap = adRealTimeLogDStream.mapToPair(record -> new Tuple2<String, String>(record.key(), record.value())); logPeakDstream = hostMap.mapToPair(new PairFunction<Tuple2<String, String>, String, Long>() { @Override public Tuple2<String,Long> call(Tuple2<String, String> tuple) throws Exception { String log = tuple._2; String[] logSplited = log.split("\\|"); String eventTime= logSplited[1]; String todayDate = DATE_FORMAT.format(new Date()).trim(); String cutTime= eventTime.substring(13,eventTime.length()-7); String ip = logSplited[0].trim(); String host = logSplited[14].trim(); return new Tuple2<String, Long>(host+"-"+ip, 1L); }}); hostReduce = logPeakDstream.reduceByKeyAndWindow(new Function2<Long, Long, Long>() { @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }, Durations.minutes(10),Durations.seconds(30)); JavaPairDStream<String, Long> topNPairRdd = hostReduce.transformToPair(new Function<JavaPairRDD<String, Long>, JavaPairRDD<String, Long>>() { @Override public JavaPairRDD<String, Long> call(JavaPairRDD<String, Long> rdd) throws Exception { JavaPairRDD<Long, String> sortRDD = (JavaPairRDD<Long, String>) rdd.mapToPair(record -> new Tuple2<Long, String>(record._2, record._1)); JavaPairRDD<String, Long> sortedRdd = (JavaPairRDD<String, Long>) sortRDD.sortByKey(false).mapToPair(record -> new Tuple2<String, Long>(record._2, record._1)); List<Tuple2<String, Long>> topNs = sortedRdd.take(5); // take the first 5 outputs system.out.println (" "); System. The out. Println (" * * * * * * * * * * * * * * * * * peak access statistics window * * * * * * * * * * * * * * * * * * * "); for (Tuple2<String, Long> topN : topNs) { System.out.println(topN); } System.out.println("**********************END***********************"); System.out.println(" "); return sortedRdd; }}); topNPairRdd.foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() { @Override public void call(JavaPairRDD<String, Long> rdd) throws Exception { } }); logDetailDstream = hostMap.map(new Function<Tuple2<String,String>, String>() { @Override public String call(Tuple2<String, String> tuple) throws Exception { String log = tuple._2; String[] logSplited = log.split("\\|"); String eventTime= logSplited[1]; String todayDate = DATE_FORMAT.format(new Date()).trim(); String cutTime= eventTime.substring(13,eventTime.length()-7); String[] urlDetails = logSplited[7].split("/"); String ip = logSplited[0].trim(); String url =""; if(urlDetails.length==4){ url = urlDetails[3]; }else if(urlDetails.length==5){ url = urlDetails[3] + "/" + urlDetails[4]; }else if(urlDetails.length>=6){ url = urlDetails[3] + "/" + urlDetails[4]+ "/" + urlDetails[5]; } String host = logSplited[14].trim(); String dataTime =todayDate +" "+ cutTime; String bytesSent = logSplited[5].trim(); return dataTime+" "+host+" "+ip+" "+url+" "+bytesSent; }}); //logDetailDstream.print(); return jssc; }}; return createContextFunc;Copy the code
  • submission

      spark-submit
              --deploy-mode cluster
              --supervise
    Copy the code

1.3 Optimization 3: Achieving RDD High Availability: Enabling WAL write-ahead logging

  • Spark Streaming, in principle, uses a receiver to receive data. The received data is divided into blocks; Blocks are grouped into a batch; For a batch, an RDD is created.

  • After receiving data, the receiver immediately writes one data file to the checkpoint directory on a fault-tolerant file system (HDFS) and the other data file to the disk file. As a redundant copy of the data. No matter how your program crashes or data is lost, the data is unlikely to be permanently lost; Because there must be a copy.

    SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("StreamingSpark"); .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); .set("spark.default.parallelism", "1000"); .set("spark.streaming.blockInterval", "50"); .set("spark.streaming.receiver.writeAheadLog.enable", "true"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); JSSC. Checkpoint (" HDFS: / / 192.168.1.164:9000 / checkpoint ");Copy the code

1.4 Optimization 4: InputDStream parallelizes data reception

  • Create multiple InputdStreams to receive the same data source

  • Break down multiple topic data into a single kafkaStream to receive

    1: create kafkaStream <String, String> kafkaParams = new HashMap<String, String>(); KafkaParams. Put (metadata. Broker. "the list", "192.168.1.164:9092192168 1.165:9092192168:1.166 9092"); kafkaParams.put("zookeeper.connect","master:2181,data1:2181,data2:2181"); Build the topic set String kafkaTopics = ConfigurationManager. GetProperty (the KAFKA_TOPICS); String[] kafkaTopicsSplited = kafkaTopics.split(","); Set<String> topics = new HashSet<String>(); for(String kafkaTopic : kafkaTopicsSplited) { topics.add(kafkaTopic); JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 2: InputDStream parallel data receive int numStreams = 5; List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String,String>>(numStreams); for (int i = 0; i < numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(...) ); } JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); unifiedStream.print();Copy the code

1.5 Optimization 5: Increase the number of Blocks, the number of Partitions in each Batch RDD, and the parallelism of processing

  • Step 1: The receiver continuously obtains data from the data source. First, it collects the data at a specified time interval into a block according to block interval. The default time is 200ms, and the recommended value is not less than 50ms.
  • Step 2: According to the specified Batch interval, the batch is merged into a BATCH and created into an RDD.
  • Step 3: Start a job to process the batch RDD data.
  • Step 4: Batch RDD number of partitions A batch contains as many blocks as it contains partitions. That means how much parallelism is there; This means how many tasks in each Batch RDD are computed and processed in parallel.
  • Tuning: If you want to have more tasks and more parallelism than the default, you can manually adjust the blockInterval to reduce the block Interval. Each batch can contain more blocks. Therefore, there are more partitions, and therefore more tasks processing each Batch RDD in parallel.

1.6 Optimization 6: Repartitioning: Added the number of partitions in each Batch RDD

  • Inputstream.repartition () : repartition. Increase the number of partitions in each Batch RDD. Repartition the RDD in dstream into a specified number of partitions to improve the computational parallelism of the RDD in the specified Dstream
  • Details of parallelism adjustment:

1.7 Optimization 7: Improve parallelism

Method 1: spark. Default. Parallelism method 2: reduceByKey (numPartitions)Copy the code

JavaPairDStream<String, Long> dailyUserAdClickCountDStream = dailyUserAdClickDStream.reduceByKey( new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; }}); }, 1000);Copy the code

1.8 Optimization 8: Use Kryo serialization mechanism

  • Spark Streaming: Improves the performance of serialized tasks sent to executor. For example, if there are many tasks, the performance overhead of task serialization and deserialization can be considerable

  • The default StorageLevel of input data is storagelevel. MEMORY_AND_DISK_SER_2. When a receiver receives data, it persists the data by default. First, serialize the data and store it in memory. If the memory resource is not large enough, it is written to disk; In addition, a redundant copy is written to another executor’s Block manager for data redundancy.

    SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("StreamingSpark"); .set("spark.serializer", "org.apache.spar.serializer.KryoSerializer"); < = optimization point. The set (" spark. Default. Parallelism ", "1000"); .set("spark.streaming.blockInterval", "50"); .set("spark.streaming.receiver.writeAheadLog.enable", "true");Copy the code

1.9 Optimization 9: Batch Interval: The stream processing time must be smaller than the Batch interval

  • Batch interval refers to the number of times at which data is collected from data sources and processed. Do not set the batch interval caused by a long processing time.

      JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));  
    Copy the code

1.10 Optimization 10: Cache frequently used data

Calling rdd.cache() to cache the data also speeds up data processing, but we need more memory resources

1.11 Optimization 11: Periodically clear unnecessary data

  • You can set spark.cleaner. TTL to a reasonable value, but the value should not be too small, because it will cause unnecessary trouble if the data needed for later calculation is cleared.
  • In addition by configuring the spark. Streaming. Unpersist to true (the default is true) to more intelligently to persistent unpersist RDD. This configuration enables the system to find RDD’s that do not need to be kept frequently, and then persist them. This reduces Spark RDD memory usage and may also improve garbage collection behavior.

1.12 Optimization 12: GC Optimization Strategy (temporarily uncertain)

Parallel Mark-sweep garbage collection is recommended, although it consumes more resources, but we recommend enabling it. Used in spark-submit

--driver-java-options "-XX:+UseConcMarkSweepGC"
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
Copy the code

1.13 Optimization 13: Compression removal

Spark.rdd.press can be set to false if memory is sufficient.

1.14 Optimization 14: Executors and CPU Core Number Settings and Spark On Yarn Dynamic resource allocation

  • Configure NodeManager on YARN to support Spark Shuffle Service.

    ## modify <property> <name> yarn.nodeManager. aux-services</name> <value>mapreduce_shuffle,spark_shuffle</value> </property> ## add <property> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <property> <name>spark.shuffle.service.port</name> <value>7337</value> </property>Copy the code
  • Copy the spark JAR package to the hadoop directory:

    Find the spark-<version>-yarn-shuffle.jar shuffle package of the Spark version and place the package in the classpath of all nodeManagers in the cluster. Such as in HADOOP_HOME/share/hadoop/yarn/libCopy the code
  • Restart all NodeManagers.

  • The configuration example is as follows:

    spark.shuffle.service.enabled=true spark.dynamicAllocation.enabled=true spark.dynamicAllocation.executorIdleTimeout=60s spark.dynamicAllocation.initialExecutors=1 spark.dynamicAllocation.maxExecutors=5 spark.dynamicAllocation.minExecutors=0  spark-submit \ --master yarn \ --deploy-mode cluster \ --executor-cores 3 \ --executor-memory 10G \ --driver-memory 4G \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.shuffle.service.enabled=true \ --conf spark.dynamicAllocation.initialExecutors=5 \ --conf spark.dynamicAllocation.maxExecutors=40 \ --conf spark.dynamicAllocation.minExecutors=0 \ --conf spark.dynamicAllocation.executorIdleTimeout=30s \ --conf spark.dynamicAllocation.schedulerBacklogTimeout=10s \Copy the code

1.15 Optimization 15: Using high-performance operators

Using reduceByKey/aggregateByKey alternative groupByKey mapPartitions substitute the normal map using alternative foreach foreachPartitions to coalesce filter after operation Using repartitionAndSortWithinPartitions alternative repartition and sort operations

1.16 Optimization 16: SparkStreaming and Kafka integration tuning

LocationStrategies:

The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance 
reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each 
batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.
Copy the code

The new Kafka consumer API can pre-fetch messages and cache them into buffers, so Spark’s integration with Kafka to allow consumers to cache on executors is a great performance benefit, scheduling partitions at consumers’ host locations.

In most cases, you should use LocationStrategies.PreferConsistent as shown above. This will distribute partitions 
evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use PreferBrokers,
which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant 
skew in load among partitions, use PreferFixed. This allows you to specify an explicit mapping of partitions to 
hosts (any unspecified partitions will use a consistent location).
Copy the code

Usually, you can use LocationStrategies PreferConsistent, this strategy will partition distribution to all available on the executor. If your executor and Kafkabroker are on the same host, use PreferBrokers so that the Kafka Leader schedules the partition. Finally, you can use PreferFixed if you load data that is slant, which will allow you to map a partition to the host (a PreferConsistent policy will be used if no partition is specified).

The cache for consumers has a default maximum size of 64. If you expect to be handling more than 
(64 * number of executors) Kafka partitions, you can change this setting
via spark.streaming.kafka.consumer.cache.maxCapacity
Copy the code

Consumer default cache size is 64, if you expect to handle larger Kafka partition, you can use a spark. Streaming. Kafka. Consumer. Cache. MaxCapacity set size.

The cache is keyed by topicpartition and group.id, so use a separate group.id for each call to createDirectStream.
Copy the code

The cache uses the key for the topic partition and the group ID, so a different group.id can be used for each call to createDirectStream

public static SparkConf conf = new SparkConf() .setMaster("local[4]") .setAppName("java/RealTimeStreaming") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.default.parallelism", "10") .set("spark.streaming.blockInterval", "50") .set("spark.streaming.receiver.writeAheadLog.enable", "true"); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "Master:9092,Worker1:9092,Worker2:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "TestGroup"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit",true); JavaStreamingContext jssc = new JavaStreamingContext( conf, Durations.seconds(30)); jssc.checkpoint("hdfs://Master:9000/checkpoint"); Build the topic set String kafkaTopics = ConfigurationManager. GetProperty (the KAFKA_TOPICS); String[] kafkaTopicsSplited = kafkaTopics.split(","); Set<String> topics = new HashSet<String>(); for(String kafkaTopic : kafkaTopicsSplited) { topics.add(kafkaTopic); } JavaInputDStream<ConsumerRecord<String, String>> adRealTimeLogDStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams));Copy the code

1.17 Optimization 17: SparkStreaming gracefully Exits

public static void main(String[] args) throws Exception{ Logger.getLogger("org").setLevel(Level.ERROR); //String checkpointPath = PropertiesUtil.getProperty("streaming.checkpoint.path"); JavaStreamingContext javaStreamingContext = JavaStreamingContext.getOrCreate("hdfs://Master:9000/streaming_checkpoint", createContext()); javaStreamingContext.start(); Every 20 seconds to monitor whether there is a stop order, if have a graceful exit streaming final Properties serverProps = PropertiesUtil. Properties; Thread thread = new Thread(new MonitorStopThread(javaStreamingContext,serverProps)); thread.start(); javaStreamingContext.awaitTermination(); }}Copy the code

Conclusion 2

Spark Streaming is a very good batch application and I have integrated all my tuning experience, of course there are points that can be optimized in the future, I will add them in time. Hard written, their cherish.

Qin Kaixin 20181119 0:23