I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)
and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.
I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?
My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).
Another option is to use an object pool as illustrated in this example:
https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala
I however found it hard to implement when using checkpointing.
Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):
http://allegro.tech/2015/08/spark-kafka-integration.html
Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.
I'd suggest the following approach:
KafkaProducer
instance per executor process/JVM.Here's the high-level setup for this approach:
KafkaProducer
because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use alazy val
so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry aboutKafkaProducer
not being serializable.The code snippets below work with Spark Streaming as of Spark 2.0.
Step 1: Wrapping
KafkaProducer
Step 2: Use a broadcast variable to give each executor its own wrapped
KafkaProducer
instanceStep 3: Write from Spark Streaming to Kafka, re-using the same wrapped
KafkaProducer
instance (for each executor)Hope this helps.
There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.
The Writer can be found here: https://github.com/cloudera/spark-kafka-writer
Why is it infeasible? Fundamentally each partition of each RDD is going to run independently (and may well run on a different cluster node), so you have to redo the connection (and any synchronization) at the start of each partition's task. If the overhead of that is too high then you should increase the batch size in your
StreamingContext
until it becomes acceptable (obv. there's a latency cost to doing this).(If you're not handling thousands of messages in each partition, are you sure you need spark-streaming at all? Would you do better with a standalone application?)
This might be what you want to do. You basically create one producer for each partition of records.
Hope that helps
I was having the same issue and found this post.
The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.
He uses a wrapper that lazily creates the producer:
The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer: