The data source is from Databricks Notebook demo:Five Spark SQL Helper Utility Functions to Extract and Explore Complex Data Types!
But when I try these code on my own laptop, I always get errors.
First, load JSON data as DataFrame
res2: org.apache.spark.sql.DataFrame = [battery_level: string, c02_level: string]
scala> res2.show
+-------------+---------+
|battery_level|c02_level|
+-------------+---------+
| 7| 886|
| 5| 1378|
| 8| 917|
| 8| 1504|
| 8| 831|
| 9| 1304|
| 8| 1574|
| 9| 1208|
+-------------+---------+
Second, write
data to Kafka:
res2.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.save()
All of those follows the notebook demo above and official steps
But errors shows:
scala> res2.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "iot-devices")
.save()
org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72)
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:71)
at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:87)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:165)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
... 52 elided
I assumed that it might be the Kafka problem, then I test the DataFrame read
from Kafka to ensure the connectivity:
scala> val kaDF = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "iot-devices")
.load()
kaDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]
scala> kaDF.show
+----+--------------------+-----------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null| [73 73 73 73 73]|iot-devices| 0| 0|2017-09-27 11:11:...| 0|
|null|[64 69 63 6B 20 3...|iot-devices| 0| 1|2017-09-27 11:29:...| 0|
|null| [78 69 78 69]|iot-devices| 0| 2|2017-09-27 11:29:...| 0|
|null|[31 20 32 20 33 2...|iot-devices| 0| 3|2017-09-27 11:30:...| 0|
+----+--------------------+-----------+---------+------+--------------------+-------------+
So, the result shows that reading data in topic "iot-devices" from Kafka bootstrap.servers localhost:9092
does work.
I searched a lot online, but still can't solve it?
Can Anybody with Spark SQL experience tell me what is wrong in my command?
Thanks!
The error message clearly shows the source of the problem:
The
Dataset
to be written has to have at leastvalue
column (and optionallykey
andtopic
) andres2
has onlybattery_level
,c02_level
.You can for example: