Pyspark Structured Streaming Kafka configuration e

2019-06-10 18:58发布

问题:

I've been using pyspark for Spark Streaming (Spark 2.0.2) with Kafka (0.10.1.0) successfully before, but my purposes are better suited for Structured Streaming. I've attempted to use the example online: https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

with the following analogous code:

ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
query = ds1
  .writeStream
  .outputMode('append')
  .format('console')
  .start()
query.awaitTermination() 

However, I always end up with the following error:

: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value

I also tried adding in this to my set of options when creating ds1:

.option("partition.assignment.strategy", "range")

But even explicitly assigning it a value didn't stop the error, nor did any other value (like "roundrobin") that I could find online or in the Kafka documentation.

I also tried this with the "assign" option and achieved the same error (our Kafka host is set up for assign--each consumer is assigned only one partition, and we don't have any rebalancing).

Any idea what's going on here? The documentation isn't helpful (probably since it's still in experimental phase). Also, is there anyway to do Structured Streaming using KafkaUtils? Or is this the only gateway?

回答1:

  1. There is a known issue in Kafka 0.10.1.* client, and you should not use it with Spark because it may generate wrong answers due to https://issues.apache.org/jira/browse/KAFKA-4547 . You can use 0.10.0.1 client, and it should work with 0.10.1.* Kafka cluster.

  2. To send a Kafka configuration to Kafka consumer client in Structured Streaming, you need to add the kafka. prefix, such as .option("kafka.partition.assignment.strategy", "range"). However, you don't need to set kafka.partition.assignment.strategy because it has a default value. My hunch is you probably put both Kafka 0.8.* and 0.10.* jars on classpath and load wrong classes.

  3. Which API in KafkaUtils you want to use but is missing in Structured Streaming? Spark 2.2.0 is just out, you can use both batch or streaming queries with Kafka in Structured Streaming. Read http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html for examples.



回答2:

Add kafka-clients-*.jar to your spark jar folder, then restart the spark master and slave. then you don't need to add .option("partition.assignment.strategy", "range")