How to write streaming dataset to Kafka?

2019-04-08 10:47发布

I'm trying to do some enrichment to the topics data. Therefore read from Kafka sink back to Kafka using Spark structured streaming.

val ds = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("subscribe", "topicname")
      .load()


val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
      record._2, record._3)

val query = enriched.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("topic", "desttopic")
      .start()

But im getting an exception:

Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
    at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
    at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Any workarounds?

3条回答
可以哭但决不认输i
2楼-- · 2019-04-08 11:15

Try this

ds.map(_.toString.getBytes).toDF("value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092"))
      .option("topic", topic)
      .start
      .awaitTermination()
查看更多
孤傲高冷的网名
3楼-- · 2019-04-08 11:23

Spark 2.1 (which is currently the latest release of Spark) doesn't have it. The next release - 2.2 - will have Kafka Writer, see this commit.

Kafka Sink is the same as Kafka Writer.

查看更多
Animai°情兽
4楼-- · 2019-04-08 11:28

As T. Gawęda mentioned, there is no kafka format to write streaming datasets to Kafka (i.e. a Kafka sink).

The currently recommended solution in Spark 2.1 is to use foreach operator.

The foreach operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface ForeachWriter (Scala/Java docs), which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.

查看更多
登录 后发表回答