I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.
My program:
- Simulates the streaming arrival of files to the folder "monitoring_dir" (one new file is transferred from "source_dir" each 10 seconds).
- Uses a DataStreamReader to populate the Unbounded DataFrame "inputUDF" with the content of each new file.
- Uses a DataStreamWriter to output the new rows of "inputUDF" to a valid sink.
Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.
Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.
My question is:
- How can I make this program to output to Console sink and display the results when using Databricks?
Thank you very much in advance!
Best regards, Nacho
My Program: myTest.py
import pyspark
import pyspark.sql.functions
import time
#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------
def get_source_dir_file_names(source_dir):
# 1. We create the output variable
res = []
# 2. We get the FileInfo representation of the files of source_dir
fileInfo_objects = dbutils.fs.ls(source_dir)
# 3. We traverse the fileInfo objects, to get the name of each file
for item in fileInfo_objects:
# 3.1. We get a string representation of the fileInfo
file_name = str(item)
# 3.2. We look for the pattern name= to remove all useless info from the start
lb_index = file_name.index("name='")
file_name = file_name[(lb_index + 6):]
# 3.3. We look for the pattern ') to remove all useless info from the end
ub_index = file_name.index("',")
file_name = file_name[:ub_index]
# 3.4. We append the name to the list
res.append(file_name)
# 4. We sort the list in alphabetic order
res.sort()
# 5. We return res
return res
#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
# 1. We get the names of the files on source_dir
files = get_source_dir_file_names(source_dir)
# 2. We get the starting time of the process
time.sleep(time_step_interval * 0.1)
start = time.time()
# 3. We set a counter in the amount of files being transferred
count = 0
# 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
# (i.e, the files are moved one by one for each time period, simulating their generation).
for file in files:
# 4.1. We copy the file from source_dir to dataset_dir#
dbutils.fs.cp(source_dir + file, monitoring_dir + file)
# 4.2. We increase the counter, as we have transferred a new file
count = count + 1
# 4.3. We wait the desired transfer_interval until next time slot.
time.sleep((start + (count * time_step_interval)) - time.time())
# 5. We wait a last time_step_interval
time.sleep(time_step_interval)
#------------------------------------
# FUNCTION my_main
#------------------------------------
def my_main():
# 0. We set the mode
console_sink = True
# 1. We set the paths to the folders
source_dir = "/FileStore/tables/my_dataset/"
monitoring_dir = "/FileStore/tables/my_monitoring/"
checkpoint_dir = "/FileStore/tables/my_checkpoint/"
result_dir = "/FileStore/tables/my_result/"
dbutils.fs.rm(monitoring_dir, True)
dbutils.fs.rm(result_dir, True)
dbutils.fs.rm(checkpoint_dir, True)
dbutils.fs.mkdirs(monitoring_dir)
dbutils.fs.mkdirs(result_dir)
dbutils.fs.mkdirs(checkpoint_dir)
# 2. We configure the Spark Session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
# 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
inputUDF = spark.readStream.format("text")\
.load(monitoring_dir)
myDSW = None
# 4. Operation A1: We create the DataStreamWritter...
# 4.1. To either save to result_dir in append mode
if console_sink == False:
myDSW = inputUDF.writeStream.format("text")\
.option("path", result_dir) \
.option("checkpointLocation", checkpoint_dir)\
.trigger(processingTime="10 seconds")\
.outputMode("append")
# 4.2. Or to display by console in append mode
else:
myDSW = inputUDF.writeStream.format("console")\
.trigger(processingTime="10 seconds")\
.outputMode("append")
# 5. We get the StreamingQuery object derived from starting the DataStreamWriter
mySQ = myDSW.start()
# 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
streaming_simulation(source_dir, monitoring_dir, 10)
# 7. We stop the StreamingQuery to finish the application
mySQ.stop()
#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
my_main()
My Dataset: f1.txt
First sentence.
Second sentence.
My Dataset: f2.txt
Third sentence.
Fourth sentence.
My Dataset: f3.txt
Fifth sentence.
Sixth sentence.