Our project needs to consume messages from Kafka, process them, and write them to ActiveMQ to serve as data sources for external systems. Based on this logic, we need to read Kafka’s message through Spark Streaming, and the result is actually an RDD. DStream provides foreachRDD(FUNC) method, through which each record of RDD can be traversed, and then the processed message can be sent to ActiveMQ through the Producer of ActiveMQ.

To send a message to ActiveMQ, you need to establish a connection to the message queue. In a traditional programming implementation, the most intuitive approach would be to keep the code that gets the connection outside of the foreachRDD(funC) method to avoid unnecessary resource and time consumption. Such as:

dstream.foreachRDD { rdd =>

val producer = createProducer()

rdd.foreach { message =>

producer.send(message)

}

}



def createProducer(): MessageProducer = {

val conn = createActiveMQConnection()

val session = sessionFrom(conn)

producerFrom(session)

}

Copy the code

However, this doesn’t work in Spark Streaming. The reason is that the func in the foreachRDD(func) method is executed in the Driver process that calls the Spark streaming program, while the operations in the traversed RDD are executed in the worker:

dstream.foreachRDD { rdd =>

Val Producer = createProducer() // Execute in the driver process

rdd.foreach { message =>

Producer. send(message) // Executes in the worker process

}

}

Copy the code

This requires serializing the obtained objects (in the example, Connection, Session, and Producer) so that they can be sent from the driver to the worker. However, objects related to connection or resource often cannot support serialization and cannot be properly initialized in worker.

One way to avoid this is to move the previous createProducer() method inside rdD.foreach (fn). However, creating a Connection object often takes time and resources, and constantly creating and then closing connections for each RDD can affect overall system throughput and performance.

The solution is to use the foreachPartition(func) method, which creates a single connection object, and then uses this connection object in the RDD partition to send all data:

dstream.foreachRDD { rdd =>

rdd.foreachPartition { partitionOfRecords =>

sendToActiveMQ { producer =>

partitionOfRecords.foreach(record => producer.send(record))

}

}

}



def sendToActiveMQ(send: MessageProducer => Unit):Unit => {

val conn = createActiveMQConnection()

val session = sessionFrom(conn)

val producer = producerFrom(session)

send(producer)

conn.close()

}

Copy the code

To avoid excessive creation and release of Connection objects, a better solution is to use connection pooling. Since I’ve extracted special methods for connection creation and closure in the previous code, just modify sendToActiveMQ() above:

def sendToActiveMQ(send: MessageProducer => Unit):Unit => {

val conn = ActiveMQConnectionPool.getConnection()

val session = sessionFrom(conn)

val producer = producerFrom(session)

send(producer)

ActiveMQConnectionPool.returnConnnection(conn)

}

Copy the code

Spark, the distributed architecture of Driver and Worker cooperation, is slightly different from the single-node programming model. Problems can occur if you are not careful in your development. Of course, facing these problems, the most fundamental is to understand Spark’s design nature, and the problem will be solved easily.

  • Author: Zhang Yi
  • Links to this article: Zhangyi. Xyz/foreachrdd -…
  • Copyright Notice: All articles on this blog are licensed under a CC BY-NC-SA 3.0 license unless otherwise stated. Reprint please indicate the source!