I want spark to continuously monitor a directory and read the CSV files by using spark.readStream
as soon as the file appears in that directory.
Please don't include a solution of Spark Streaming. I am looking for a way to do it by using spark structured streaming.
Here is the complete Solution for this use Case:
If you are running in stand alone mode. You can increase the driver memory as:
bin/spark-shell --driver-memory 4G
No need to set the executor memory as in Stand Alone mode executor runs within the Driver.
As Completing the solution of @T.Gaweda, find the solution below:
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
csvDf.writeStream.format("console").option("truncate","false").start()
now the spark will continuously monitor the specified directory and as soon as you add any csv file in the directory your DataFrame operation "csvDF" will be executed on that file.
Note: If you want spark to inferschema you have to first set the following configuration:
spark.sqlContext.setConf("spark.sql.streaming.schemaInference","true")
where spark is your spark session.
As written in official documentation you should use "file" source:
File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
Code example taken from documentation:
// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
If you don't specify trigger, Spark will read new files as soon as possible