2019-06-10 18:58发布


I've been using pyspark for Spark Streaming (Spark 2.0.2) with Kafka ( successfully before, but my purposes are better suited for Structured Streaming. I've attempted to use the example online:

with the following analogous code:

ds1 = spark
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
query = ds1

However, I always end up with the following error:

: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value

I also tried adding in this to my set of options when creating ds1:

.option("partition.assignment.strategy", "range")

But even explicitly assigning it a value didn't stop the error, nor did any other value (like "roundrobin") that I could find online or in the Kafka documentation.

I also tried this with the "assign" option and achieved the same error (our Kafka host is set up for assign--each consumer is assigned only one partition, and we don't have any rebalancing).

Any idea what's going on here? The documentation isn't helpful (probably since it's still in experimental phase). Also, is there anyway to do Structured Streaming using KafkaUtils? Or is this the only gateway?


  1. There is a known issue in Kafka 0.10.1.* client, and you should not use it with Spark because it may generate wrong answers due to . You can use client, and it should work with 0.10.1.* Kafka cluster.

  2. To send a Kafka configuration to Kafka consumer client in Structured Streaming, you need to add the kafka. prefix, such as .option("kafka.partition.assignment.strategy", "range"). However, you don't need to set kafka.partition.assignment.strategy because it has a default value. My hunch is you probably put both Kafka 0.8.* and 0.10.* jars on classpath and load wrong classes.

  3. Which API in KafkaUtils you want to use but is missing in Structured Streaming? Spark 2.2.0 is just out, you can use both batch or streaming queries with Kafka in Structured Streaming. Read for examples.


Add kafka-clients-*.jar to your spark jar folder, then restart the spark master and slave. then you don't need to add .option("partition.assignment.strategy", "range")