1. The Spark Streaming profile

Spark Streaming reads data from a variety of input sources and groups the data into small batches. New batches are created at even intervals. At the beginning of each time period, a new batch is created, and the data received within that time period is added to the batch. Batches stop growing at the end of the time interval, the size of which is determined by the parameter batch interval. Batch intervals are typically between 500 milliseconds and a few seconds, as configured by the developer. Each input batch forms an RDD, which is processed as a Spark job and generates other RDD’s. The results of the processing can be transmitted to external systems in a batch manner. The programming abstraction of Spark Streaming is a discrete stream, also known as DStream. It is a sequence of RDD, and each RDD represents data within a time slice of the data stream. Window operations and state transitions are added, similar to batch processing.

Difference from StructedStreaming

StructedStreaming, born after 2.x, is mainly used for processing structured data. In addition to realizing batch processing with Spark Streaming, it also realizes long-running task. It is mainly understood that the time of processing can be the production time of data, rather than the time of receiving data. Take a closer look at the table below:

Stream processing mode SparkStreaming Structed Streaming
Execution mode Micro Batch Micro batch / Streaming
API Dstream/streamingContext Dataset/DataFrame,SparkSession
Job Generation mode Timer Indicates that the Timer generates jobs periodically The Trigger is triggered
Supporting data sources Socket,filstream,kafka,zeroMq,flume,kinesis Socket,filstream,kafka,ratesource
executed-based Executed based on dstream api Executed based on sparksql
Time based Processing Time ProcessingTime & eventTIme
UI Built-in No

For stream processing, Flink is mostly used in the production environment and kafka is the main data source. Therefore, the scenario of Spark Streaming in this paper is ETL stream processing structured logs and inputting the results into kafka queue

2.Spark Sreaming operation process

1. After the client submits the Spark Streaming job, the Driver starts the Receiver. The Receiver receives data from the data source

2. Each job contains multiple executors, each Executor runs tasks as a thread, and SparkStreaming contains at least one receiver task.

The Receiver receives the data, generates blocks, reports BlockId to the Driver, and backs it up to another Executor

ReceiverTracker maintains BlockId from Reciver reporting

5. The Driver periodically starts JobGenerator, generates logical RDD according to Dstream relationship, and then creates Jobset and sends it to JobScheduler

6. JobScheduler schedules jobsets and sends them to DAGScheduler, which generates Stages according to logical RDD. Each stage contains one to multiple tasks and submits tasksets to TaskSchedule

7. TaskScheduler schedules tasks to executors and maintains their running status

The common data source reading method

Constant data flow:

    val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
    val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)
Copy the code

Socket:

    val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
    val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)
Copy the code

RDD queue:

    val queue = new Queue[RDD[Int]] ()val queueDStream: InputDStream[Int] = ssc.queueStream(queue)
Copy the code

Folders:

    val lines: DStream[String] = ssc.textFileStream("data/log/")

Copy the code

3. Case description

In production, the usual process is as follows: batch raw Kafka logs, such as request log, use Spark Streaming to transform data cleaning into a certain format and import it into Kafka. To ensure exact-once, the offer is saved by itself, mainly in redis-offset

Address: data link: pan.baidu.com/s/1FmFxSrPI… Extraction code: Hell

3.1 Raw Kafka logs

Sample. log format is as follows:

Let’s put it in a file to simulate xx.log in production

3.2 Create two topics and create KafkaProducer to marry your data to Mytopic1

One for raw log data and one for processed logs

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic1 --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic2 --partitions 1 --replication-factor 1  
Copy the code

Start the Redis service:

./redis-server redis.conf
Copy the code

View myTopic1 data

kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic mytopic1 --from-beginning
Copy the code

3.3 Code Implementation

The first part deals with the raw file data written to mytopic1

package com.hoult.Streaming.work

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FilerToKafka {
  def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]") val sc = new SparkContext(conf)
Val lines: RDD[String] = sc.textfile ("data/sample.log")
// define the kafka producer parameter val prop = new Properties() prop.put(producerconfig. BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
ForeachPartition {iter => KafkaProducer val producer = new KafkaProducer[String, String](prop) iter.foreach{line => val record = new ProducerRecord[String, String]("mytopic1", line) producer.send(record) } producer.close() } } }
Copy the code

In the second part, streaming reads the data from Mytopic1 and writes to mytopic2

package com.hoult.Streaming.work

import java.util.Properties

import com.hoult.Streaming.kafka.OffsetsWithRedisUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}


/六四运动* Process Kafka data per second, generate structured data, and input another Kafka topic */
object KafkaStreamingETL {
  val log = Logger.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]") val SSC = new StreamingContext(conf, Seconds(5)) Array[String] = Array("mytopic1") val groupid = "mygroup1" Map[String, Object] = getKafkaConsumerParameters (groupid) / / get the offset from Redis val fromOffsets = OffsetsWithRedisUtils. GetOffsetsFromRedis (switchable viewer, groupid) / / create the DStream val DStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, / / read the data from kafka ConsumerStrategies. Subscribe [String, String] (topics, kafkaParams, fromOffsetsForeachRDD {RDD => if (! rdd.isEmpty) { val offsetRanges: Array[OffsetRange] = rdd.asinstanceof [HasOffsetRanges].offsetranges RDD. foreachPartition(process) // Save offset to Redis OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, Groupid)} // start ssc.start() ssc.awaittermination ()} def process(iter: Iterator[ConsumerRecord[String, String]]) = { iter.map(line => parse(line.value)) .filter(! _.isEmpty) // .foreach(println) .foreach(line =>sendMsg2Topic(line, "mytopic2")) } def parse(text: String): String = { try{ val arr = text.replace("<<<! >>>", "").split(",") if (arr.length ! = 15) return "" arr. MkString catch (" |")} {case e: Exception = > the error (" parse the data error!" , e) "" } } def getKafkaConsumerParameters(groupid: String): Map[String, Object] = { Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG -> groupid, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" ) } def getKafkaProducerParameters(): Properties = { val prop = new Properties() prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop } def sendMsg2Topic(msg: String, topic: String): Unit = { val producer = new KafkaProducer[String, String] (getKafkaProducerParameters())
    val record = new ProducerRecord[String, String] (topic, msg)
    producer.send(record)
  }
}

Copy the code

Part three, the tool for reading and writing offset from Redis

package com.hoult.Streaming.kafka import java.util import org.apache.kafka.common.TopicPartition import org.apache.spark.streaming.kafka010.OffsetRange import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} import scala.collection.mutable object OffsetsWithRedisUtils {// Define Redis parameter private val redisHost = "Linux121" private val redisPort = 6379 private val config = new JedisPoolConfig // Max free number SetMaxTotal (10) private val pool = new JedisPool(config, redisHost, redisPort, 10000) private def getRedisConnection: Jedis = pool.getResource private val topicPrefix = "kafka:topic" kafka:topic:TopicName:groupid private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid" String): Map[TopicPartition, Long] = {    val jedis: Jedis = getRedisConnection

val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic => val key = getKey(topic, groupId)
      import scala.collection.JavaConverters._

jedis.hgetAll(key) .asScala .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong } }
// Return resource jedis.close()
offsets.flatten.toMap }Def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = {Val jedis: jedis = getRedisConnection
    // 组织数据
    offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))}
        .groupBy(_._1)
      .foreach{case (topic, buffer) =>
        val key: String = getKey(topic, groupId)

import scala.collection.JavaConverters._ val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava
Jedis.hmset (key, maps)}
jedis.close() }
  def main(args: Array[String]): Unit = {
val topics = Array("mytopic1") val groupid = "mygroup1" val x: Map[TopicPartition, Long] = getOffsetsFromRedis(topics, groupid) x.foreach(println) } }
Copy the code

3.4 presentation

  • Start redis. /redis-server./redis.conf

  • Start kafka and create topic sh scripts/kafka.sh start 3.2 Create two topics and create KafkaProducer to marry your data to mytopic1

  • Start FilerToKafka and KafkaStreamingETL

Full code: github.com/hulichao/bi…

4. Spark-streamin Precautions

Spark-streaming file cannot be read. When reading a local file, note that it does not read the text already in the file. It only reads the data that was passed into the folder during the listening process. Started after the moment before listening, actually means, if you want to be monitored folders preach a text, you will be after the start of the listener, to open the first text, simply type a few Spaces, or press enter, or other do not affect the operation of the text content, and then save, and then pass into the folder, So it can detect the text that’s being passed in. (estimates that it was changed the intention is just to monitor the text), refer to: www.codeleading.com/article/956… Check your profile for more.