I have the following problem with Spark Streaming API. I am currently streaming input data via Flume to Spark Streaming, with which I plan to do some preprocessing for the data. Then, I'd like to save the data to Hadoop's file system and query it with Impala. However, Spark is writing the data files to separate directories and a new directory is generated for every RDD.
This is a problem because, first of all, the external tables in Impala cannot detect subdirectories, but only files, inside the directory they are pointing to, unless partitioned. Secondly, the new directories are added so fast by Spark that it would be very bad for performance to create a new partition periodically in Impala for every generated directory. On the other hand, if I choose to increase the roll interval of the writes in Spark, so that the directories will be generated less frequently, there will be an added delay until Impala can read the incoming data. This is not acceptable since my system has to support real-time applications. In Hive, I could configure the external tables to also detect the subdirectories without need for partitioning, by using these settings:
set hive.mapred.supports.subdirectories=true;
set mapred.input.dir.recursive=true;
But to my understandig Impala does not have a feature like this.
I am currently using the following code for reading the data from Flume and writing it to HDFS:
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8"))).saveAsTextFiles(path)
Here, the variable path determines the prefix of the directory, to which the text files (part-0000 and so on) are added, and the rest of the directory name is a timestamp generated by Spark. I could change the code to something like this:
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
val mapStream = stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8")))
mapStream.foreachRDD(rdd => rdd.saveAsTextFile(path))
In this case the files would be added to the same directory determined by path, but since they are always named part-00000, part-00001, part-00002, etc. the previously generated files will be overwritten. While examining the source code of Spark, I noticed that the names of the files are determined by a line in SparkHadoopWriter's open() method:
val outputName = "part-" + numfmt.format(splitID)
And it seems to me that there is no way to manipulate splitID through Spark API. To summarize, my questions are the following:
- Is there any method to make the external tables in Impala detect subdirectories?
- If not, is there any method to make Spark write its output files into a single directory or otherwise in a form that is instantly readable by Impala?
- If not, is there any kind of update expected with Spark to fix this issue or should I just branch my own version of Spark with which I can decide the names the files that it writes myself?