instructions

There are many articles explaining the principle of Spark Streaming, which will not be covered here. This article mainly introduces the programming model using Kafka as data source, coding practice, and some optimization instructions

The spark streaming:spark.apache.org/docs/1.6.0/…

Streaming-kafka-integration:spark.apache.org/docs/1.6.0/…

The demo environment

  1. The Spark: 1.6
  2. Kafka: kafka_2. 11-0.9.0.1
  3. Implementation language: Python

Programming model

At present Spark Streaming kafka programming mainly includes two models 1. Based on Receiver 2. Direct(no Receiver)

Based on the Receiver

This method uses receivers to receive data in Kafka, which basically uses kafka high-level user API interface. For all receivers, the data received from Kafka is stored in Spark’s executor, which is then processed by spark Streaming’s submitted job

Schematic diagram

instructions

  1. Write Ahead Logs is needed to ensure that data is not lost. If we enable Write Ahead Logs to be copied to a file system such as HDFS, Storagelevel.memory_and_disk_ser kafkautils.createstream (kafkautils.createstream (kafkautils.createstream)) , StorageLevel.MEMORY_AND_DISK_SER)
  2. In the Receiver approach, partitions in Spark and Partitions in Kafka are not related, so if we increase the number of partitions per topic, we are simply adding threads to handle topics consumed by a single Receiver. But this does not increase Spark’s parallelism in processing data.
  3. For different groups and topics, we can use multiple receivers to create different Dstreams to receive data in parallel, and then use union to unify a Dstream.

Direct (not Receiver)

After Spark1.3, the Direct mode was introduced. Different from Receiver, the Direct mode does not have Receiver. It periodically obtains the latest offsets in each partition of each topic in Kafka, and then processes each batch according to the specified maxRatePerPartition

It is different from the mode of Receiver (read the offset value from Zookeeper, then the natural Zookeeper saves the current consumption of the offset value, so if restart to start consumption will continue to consume the last offset value). In the Direct mode, data is read directly from Kafka, so offset needs to be recorded by itself. You can use checkpoint, database or file record, or write back to ZooKeeper

Schematic diagram

instructions

  1. Simplified parallelism: In the way of Receiver, we mentioned that after creating multiple receivers, union is used to merge them into a Dstream to improve the parallelism of data transmission. In Direct mode, partitions in Kafka and PARTITIONS in RDD read Kafka data in parallel in one-to-one mapping. This mapping relationship is easier to understand and optimize.
  2. Efficient: In Receiver mode, in order to achieve zero data loss, data needs to be stored in Write Ahead Log. This saves two data copies in Kafka and Log. Waste! The second method does not have this problem, as long as our Kafka data retention is long enough, we can restore data from Kafka.
  3. To be exact once: In the mode of Receiver, Kafka’s high-level API interface is used to obtain the offset value from Zookeeper. This is also the traditional way of reading data from Kafka. However, the data consumed by Spark Streaming is not synchronized with the offset value recorded in Zookeeper. This approach occasionally leads to repeated consumption of data. The second method uses a simple low-order Kafka API that Offsets the Spark Streaming ability to capture during the recording process.

Code practice

Kafka producers

package com.eric.kafka.producer;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/** * Hello world! * /
public class ProcuderSample {
    private final Producer<String, String> producer;
    public final static String TOPIC = "spark_streaming_test_topic";
    public final static Integer BATCH_SIZE = 2000;

    private ProcuderSample() {
        Properties props = new Properties();
        // Kafka port is configured here
        props.put("metadata.broker.list"."server1-2-5-24-138:9092,server1-3-5-24-139:9092,server1:9092");
        // Configure the serialization class for value
        props.put("serializer.class"."kafka.serializer.StringEncoder");
        // Configure the serialization class for key
        props.put("key.serializer.class"."kafka.serializer.StringEncoder");
        props.put("request.required.acks"."1");
        producer = new Producer<String, String>(new ProducerConfig(props));
    }

    public void deadLoopSendMessage() {int recordCount=0;
        List<KeyedMessage<String, String>> tmpList=new ArrayList<KeyedMessage<String, String>>();
        while(true){
            Random rand=new Random();
            // Send data in batches
// String randResult=recordCount+":"+rand.nextInt(100);
            String randResult=rand.nextInt(10) +"";
            tmpList.add(new KeyedMessage<String, String>(TOPIC, randResult , randResult));
            if (tmpList.size()%BATCH_SIZE==0){
                producer.send(tmpList);
                tmpList.clear();
            }
// producer.send(new KeyedMessage
       
        (TOPIC, randResult , randResult));
       ,>
            recordCount+=1; }}public static void main(String[] args) {
        newProcuderSample().deadLoopSendMessage(); }}Copy the code

Receiver Receives data in Receiver mode

# encoding:utf-8
__author__ = 'eric.sun'

"" Demo Spark Streaming using Kafka Streaming ./spark-submit -- Master Spark :// Server1-1-5-24-137:7077 -- Packages org.apache.spark: Spark-streaming -kafka_2.10:1.6.0.. /examples/kafka_streaming. Py > log Kafka https://github.com/Eric-aihua/practise.git/java_cookbook/cookbook/src/main/java/com/eric/kafka/producer/ProcuderSample.j ava """

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


def start(a):
    sconf=SparkConf()
    # sconf.set('spark.streaming.blockInterval','100')
    sconf.set('spark.cores.max' , 8)
    sc=SparkContext(appName='KafkaWordCount',conf=sconf)
    ssc=StreamingContext(sc,2)

    numStreams = 3
    kafkaStreams = [KafkaUtils.createStream(ssc,"server1-2-5-24-138:2181,server1-3-5-24-139:2181,server1-4-5-24-140:2181"."streaming_test_group", {"spark_streaming_test_topic":1}) for _ in range (numStreams)]
    unifiedStream = ssc.union(*kafkaStreams)
    print unifiedStream
    # Count the distribution of random numbers generated
    result=unifiedStream.map(lambda x:(x[0].1)).reduceByKey(lambda x, y: x + y)
    result.pprint()
    ssc.start()             # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate

if __name__ == '__main__':
    start()Copy the code

Data is collected in Direct mode

# encoding:utf-8
__author__ = 'eric.sun'

"" Demo Spark Streaming using Kafka Direct Streaming ./spark-submit -- Master Spark :// Server1-1-5-24-137:7077 -- Packages org.apache.spark: Spark-streaming -kafka_2.10:1.6.0.. /examples/kafka_streaming. Py > log Kafka https://github.com/Eric-aihua/practise.git/java_cookbook/cookbook/src/main/java/com/eric/kafka/producer/ProcuderSample.j Advantages of AVA using Direct 1: create a corresponding number of RDD partitions according to the number of partitions in topic by default 2: Write AHead Log is required to ensure that data is not lost in Receiver mode, and Direct mode does not need 3: one time processing: Kafka Simple API is used to read data, and offset is used to record data. The zooKeeper-based Kafka monitoring tool cannot obtain the value of offset. The value of Zookeeper can be set after each Batch processing.

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


def start(a):
    sconf=SparkConf()
    sconf.set('spark.cores.max' , 8)
    sc=SparkContext(appName='KafkaDirectWordCount',conf=sconf)
    ssc=StreamingContext(sc,2)

    brokers="server1-2-5-24-138:9092,server1-3-5-24-139:9092,server1-4-5-24-140:9092"
    topic='spark_streaming_test_topic'
    kafkaStreams = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list": brokers})
    # Count the distribution of random numbers generated
    result=kafkaStreams.map(lambda x:(x[0].1)).reduceByKey(lambda x, y: x + y)
    # print offset, this can also be written to Zookeeper
    #You can use transform() instead of foreachRDD() as your
    # first method call in order to access offsets, then call further Spark methods.
    kafkaStreams.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)
    result.pprint()
    ssc.start()             # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate

offsetRanges = []

def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    return rdd

def printOffsetRanges(rdd):
    for o in offsetRanges:
        print "%s %s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset,o.untilOffset-o.fromOffset)

if __name__ == '__main__':
    start()
Copy the code

Tuning summary

Spark Streaming +Kafka: When the amount of data is small, the default configuration and usage can be satisfied in most cases. However, when the amount of data is large, certain adjustment and optimization are required. Such adjustment and optimization require different configurations for different scenarios.

  1. BatchDuration: Almost all Spark Streaming tuning documents mention batchDuration adjustment. One of the parameters is the batchDuration setting when StreamingContext is initialized. If this value is set too short, that is, the jobs created by a batchDuration cannot be processed during this time, then data will accumulate and eventually cause Spark Streaming to block. Moreover, the batchDuration setting should not be less than 500ms in general, because too small will cause frequent submission of SparkStreaming jobs, causing additional burden to the whole streaming. In ordinary applications, according to different application scenarios and hardware configuration, I set the batchDuration between 1 and 10s. We can adjust batchDuration by observing Total Delay on the visual monitoring interface of SparkStreaming
  2. Reasonable Kafka pull (maxRatePerPartition important) : This configuration is critical for Spark Streaming consuming Kafka data. The configuration parameters are: The spark. Streaming. Kafka. MaxRatePerPartition. This parameter is off by default, meaning that kafka pulls out as much data as it has in it. According to the rate at which the producer writes Kafka and the speed at which the consumer processes data, this parameter needs to be combined with the above batchDuration, so that the data pulled by each partition during each batchDuration can be successfully processed to achieve the highest throughput. For details about how to adjust these parameters, see Input Rate and Processing Time on the visual monitoring page
  3. Cache repeatedly used Dstream (RDD) : If the RDD of Spark and Dstream of SparkStreaming are repeatedly used, cache the data flow to prevent network overhead caused by excessive resource scheduling. Check the Scheduling Delay parameters
  4. Set up a proper GC: If you have been using Java for a long time, you know that the garbage collection mechanism in the JVM allows you to focus less on memory allocation and more on business logic, and the JVM will take care of it for you. For those of you who are familiar with JVMS, you know that in Java virtual machines, memory is divided into edengeneration, young generation, oldgeneration, and permanentgeneration. Each of these GC’s takes a certain amount of time, especially in older GC’s, which require defragmentation, usually with a mark-and-clear approach. Similarly, in Spark applications, the frequency and duration of JVMGC are key factors affecting the overall Spark efficiency. In usually use advice: – the conf “spark. Executor. ExtraJavaOptions = – XX: + UseConcMarkSweepGC”
  5. Set an appropriate number of CPU resources: The number of cores per executor. You can look at CPU usage to see how computing resources are being used. For example, a common waste is for an executor to occupy multiple cores. However, the total CPU usage is not high (because an executor does not always take full advantage of the multi-core capabilities). In this case, it is possible to consider using less core for each executor and adding more executors under the worker. Or add more workers to a host to increase the number of executors running in parallel, thereby increasing CPU utilization. However, you need to consider memory consumption when adding executors. The more memory a machine allocates to more executors, the less memory each executor has. As a result, too much data is spilled over or even out of memory
  6. Set up reasonable Parallelism: Partition and parallelism. Partition refers to the number of data slices that can be processed on only one partition at a time. If this value is too small, the amount of data on each partition will be too large, resulting in memory stress or insufficient computing power for many executors. However, if the size is too large, it will lead to too many fragments and lower execution efficiency. When action operations (such as reduce operations) are performed, the number of partitions is the largest in the Parent RDD. Parallelism refers to the number of parititions returned by default for reduce operations in the RDD. (For Map operations, the number of partitions is usually one of the larger partitions in the parent RDD, and shuffle is not involved. So this parallelism parameter has no effect). So, these two concepts are closely related, both involve data sharding, and they work in the same way. Through the spark. Default. Parallelism can set the default number of fragmentation, and a lot of RDD operation can specify a partition parameter to explicitly control specific subdivision number. In the use of SparkStreaming+ Kafka, we use the Direct connection mode. As described in the previous section, partitions in Spark correspond to partitions in Kafka one by one. We generally set the default value to the number of partitions in Kafka.
  7. Use high-performance operators: (1) using reduceByKey/aggregateByKey replacement groupByKe (2) using mapPartitions completely normal map (3) use foreachPartitions replace foreach (4) After using the filter to coalesce operation may use repartitionAndSortWithinPartitions alternative repartition and sort operations
  8. Optimize serialization performance with Kryo

    There are three main areas where serialization is involved

    • When an external variable is used in an operator function, it is serialized for network transmission (see “Principle 7: Broadcast Large Variables”).
    • When a custom type is used as a generic type of RDD (such as JavaRDD, Student is a custom type), all objects of the custom type are serialized. Therefore, in this case, custom classes must also implement the Serializable interface.
    • When using a serializable persistence strategy (such as MEMORY_ONLY_SER), Spark serializes each partition in the RDD into a large byte array.

In all three cases, we can optimize the performance of serialization and deserialization by using the Kryo serialization library. Spark is used by default Java serialization mechanism, namely ObjectOutputStream/ObjectInputStream API for serialization and deserialization. However, Spark also supports the Kryo serialization library, which has much higher performance than the Java serialization library. Kryo serialization is about 10 times more efficient than Java serialization. Spark does not use Kryo as a serialization class library by default because Kryo requires that all custom types that need to be serialized be registered, so this approach is cumbersome for developers. Here is an example of code using Kryo, where we simply set up the serialized class and register the custom types to be serialized (such as external variable types used in operator functions, custom types as RDD generic types, etc.) :

// Create a SparkConf object. val conf = new SparkConf.setMaster(.).setAppName(.// Set the serializer to KryoSerializer. conf.set("spark.serializer"."org.apache.spark.serializer.KryoSerializer") // Register the custom type to serialize. conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))Copy the code