I have a Spark Structured Streaming:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("subscribe", "topic")
.load()
I want to write data to FileSystem using DataStreamWriter,
val query = df
.writeStream
.outputMode("append")
.format("parquet")
.start("data")
But zero files are getting created in data
folder. Only _spark_metadata
is getting created.
However, I can see the data on console when format
is console
:
val query = df
.writeStream
.outputMode("append")
.format("console")
.start()
+--------------------+------------------+------------------+
| time| col1| col2|
+--------------------+------------------+------------------+
|49368-05-11 20:42...|0.9166470338147503|0.5576946794171861|
+--------------------+------------------+------------------+
I cannot understand the reason behind it.
Spark - 2.1.0
I had a similar problem but for different reasons, posting here in case someone has the same issue. When writing your output stream to file in append mode with watermarking, structured streaming has an interesting behavior where it won't actually write any data until a time bucket is older than the watermark time. If you're testing structured streaming and have an hour long water mark, you won't see any output for at least an hour.
I resolved this issue. Actually when I tried to run the Structured Streaming on
spark-shell
, then it gave an error thatendingOffsets
are not valid in streaming queries, i.e.,:So, I removed
endingOffsets
from streaming query.Then I tried to save streaming queries' result in Parquet files, during which I came to know that - checkpoint location must be specified, i.e.,:
So, I added
checkPointLocation
:After doing these modifications, I was able to save streaming queries' results in Parquet files.
But, it is strange that when I ran the same code via
sbt
application, it didn't threw any errors, but when I ran the same code viaspark-shell
these errors were thrown. I think Apache Spark should throw these errors when run viasbt
/maven
app too. It is seems to be a bug to me !