Spark Streaming - read and write on Kafka topic

2019-01-10 04:50发布

I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)

and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.

I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?

7条回答
叛逆
2楼-- · 2019-01-10 05:40

With Spark >= 2.2

Both read and write operations are possible on Kafka using Structured Streaming API

Build stream from Kafka topic

// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
  .readStream // use `read` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("subscribe", "source-topic1")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String type.

val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Since dsStruc have the schema, it's accepts all SQL kind operations like filter, agg, select ..etc on it.

Write stream to Kafka topic

dsStruc
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

More configuration for Kafka integration to read or write

Key artifacts to add in the application

 "org.apache.spark" % "spark-core_2.11" % 2.2.0,
 "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
 "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
查看更多
登录 后发表回答