I want to receive JSON strings from MQTT and parse them to DataFrames df
. How can I do it?
This is an example of Json message that I send to MQTT queue in order to process in Spark:
"id": 1,
"timestamp": 1532609003,
"distances": [2,5,7,8]
This is my code:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Test") \
.master("local[4]") \
# Custom Structured Streaming receiver
reader = spark\
.option('brokerUrl', 'tcp://')\
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")
df = spark.read.json(reader.select("value").rdd)
# Start running the query that prints the running counts to the console
query = df \
.writeStream \
.format('console') \
But this code fails:
py4j.protocol.Py4JJavaError: An error occurred while calling o45.javaToPython.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
I tried to add start
as follows:
df = spark.read.json(reader.select("value").rdd) \
.writeStream \
.format('console') \
But got the same error. My goal is to get a DataFrame df
that I can further pass through ETL processes.
The thread marked as an answer has not helped me solving the problem. First of all it gives the solution for Scala, while I am using PySpark.
Secondly, I tested the solution proposed in the answer and it returned me the empty column json
reader = spark\
.schema(spark.read.json("mqtt_schema.json").schema) \
.option('brokerUrl', 'tcp://')\
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")
json_schema = spark.read.json("mqtt_schema.json").schema
df = reader.withColumn('json', from_json(col('value'), json_schema))
query = df \
.writeStream \
.format('console') \