Spark Streaming can receive Streaming data from any data source that goes beyond its built-in support (that is, beyond Kafka, Kinesis, files, sockets, etc.). This requires developers to implement a sink that is customized for receiving data from related data sources. This guide describes the process of implementing a custom receiver and using it in a Spark Streaming application. Note that custom receivers can be implemented in Scala or Java.

1. Implement custom receivers

The first step is to implement a Receiver (Scala Doc, Java Doc). The custom Receiver must extend this abstract class Receiver by implementing two methods

  • OnStart (): What to do to start receiving data
  • OnStop (): What to do to stop receiving data

OnStart () and onStop () cannot be blocked indefinitely. Typically, onStart () will start the threads responsible for receiving data, and onStop () will ensure that those receiving data threads stop. Receiving threads can also check if they should stop receiving data using the receiving method isStopped ().

Once received, the data can be stored within Spark by calling Store (Data), a method provided by the Receiver class. There are many types of Store () that allow storing a record of received data at once or as an entire collection of object/serialized bytes. Note that the style of store () used to implement the recipient affects its reliability and fault-tolerant semantics. This will be discussed in more detail later.

Any exceptions in the receiving thread should be properly caught and handled to avoid silent failures on the receiving side. The Restart (< exception >) will Restart the sink by calling onStop () asynchronously and onStart () after a delay. Stop (< exception >) calls onStop () and terminates the receiver. In addition, reportError (< error >) reports error messages (visible in the log and UI) to the driver without stopping/restarting the sink.

Below is a custom sink that receives a text stream over a socket. It splits the “\n” delimited lines in the text stream into one record and stores them using Spark. If the receiving thread makes an error while connecting or receiving, the receiver is restarted to try connecting again.

class CustomReceiver(host: String, port: Int)
  extends Receiver[String] (StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)

      // Until stopped or connection broken continue reading
      val reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
      userInput = reader.readLine()
      while(! isStopped && userInput ! =null) {
        store(userInput)
        userInput = reader.readLine()
      }
      reader.close()
      socket.close()

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")}catch {
      case e: java.net.ConnectException= >// restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable= >// restart if there is any other error
        restart("Error receiving data", t)
    }
  }
}
Copy the code

2. Use custom receivers in the Spark Streaming application

By using streamingContext. ReceiverStream (custom receiver > < instance), can be the Spark Streaming applications using custom receiver. This creates an input DStream using the data received by the custom sink instance, as shown below:

object CustomReceiver {
  def main(args: Array[String) :Unit = {
    if (args.length < 2) {
      System.err.println("Usage: CustomReceiver <hostname> <port>")
      System.exit(1)}StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("CustomReceiver")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create an input stream with the custom receiver on target ip:port and count the
    // words in input stream of \n delimited text (e.g. generated by 'nc')
    val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
    val words = lines.flatMap(_.split(""))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}


class CustomReceiver(host: String, port: Int)
  extends Receiver[String] (StorageLevel.MEMORY_AND_DISK_2) {

  def onStart() :Unit = {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() :Unit = { receive() }
    }.start()
  }

  def onStop() :Unit = {
   // There is nothing much to do as the thread calling receive()
   // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() :Unit = {
   var socket: Socket = null
   var userInput: String = null
   try {
     println(s"Connecting to $host : $port")
     socket = new Socket(host, port)
     println(s"Connected to $host : $port")
     val reader = new BufferedReader(
       new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
     userInput = reader.readLine()
     while(! isStopped && userInput ! =null) {
       store(userInput)
       userInput = reader.readLine()
     }
     reader.close()
     socket.close()
     println("Stopped receiving")
     restart("Trying to connect again")}catch {
     case e: java.net.ConnectException =>
       restart(s"Error connecting to $host : $port", e)
     case t: Throwable =>
       restart("Error receiving data", t)
   }
  }
}
Copy the code