I have a lot of files that contain roughly 60.000.000 lines. All of my files are formatted in the format {timestamp}#{producer}#{messageId}#{data_bytes}\n
I walk through my files one by one and also want to build one output file per input file. Because some of the lines depend on previous lines, I grouped them by their producer. Whenever a line depends on one or more previous lines, their producer is always the same. After grouping up all of the lines, I give them to my Java parser. The parser then will contain all parsed data objects in memory and output it as JSON afterwards.
To visualize how I think my Job is processed, I threw together the following "flow graph". Note that I did not visualize the groupByKey
-Shuffeling-Process.
My problems:
- I expected Spark to split up the files, process the splits with separate tasks and save each task output to a "part"-file.
- However, my tasks run out of memory and get killed by YARN before they can finish:
Container killed by YARN for exceeding memory limits. 7.6 GB of 7.5 GB physical memory used
- My Parser is throwing all parsed data objects into memory. I can't change the code of the Parser.
- Please note that my code works for smaller files (for example two files with 600.000 lines each as the input to my Job)
My questions:
- How can I make sure that Spark will create a result for every file-split in my map task? (Maybe they will if my tasks succeed but I will never see the output as of now.)
I thought that my map transformation
val lineMap = lines.map ...
(see Scala code below) produces a partitioned rdd. Thus I expect the values of the rdd to be split in some way before calling my second map task.Furthermore, I thought that calling saveAsTextFile on this rdd
lineMap
will produce a output task that runs after each of my map task has finished. If my assumptions are correct, why do my executors still run out of memory? Is Spark doing several (too) big file splits and processes them concurrently, which leads to the Parser filling up the memory?- Is repartitioning the
lineMap
rdd to get more (smaller) inputs for my Parser a good idea? - Is there somewhere an additional reducer step which I am not aware of? Like results being aggregated before getting written to file or similar?
Scala code (I left out unrelevant code parts):
def main(args: Array[String]) {
val inputFilePath = args(0)
val outputFilePath = args(1)
val inputFiles = fs.listStatus(new Path(inputFilePath))
inputFiles.foreach( filename => {
processData(filename.getPath, ...)
})
}
def processData(filePath: Path, ...) {
val lines = sc.textFile(filePath.toString())
val lineMap = lines.map(line => (line.split(" ")(1), line)).groupByKey()
val parsedLines = lineMap.map{ case(key, values) => parseLinesByKey(key, values, config) }
//each output should be saved separately
parsedLines.saveAsTextFile(outputFilePath.toString() + "/" + filePath.getName)
}
def parseLinesByKey(key: String, values: Iterable[String], config : Config) = {
val importer = new LogFileImporter(...)
importer.parseData(values.toIterator.asJava, ...)
//importer from now contains all parsed data objects in memory that could be parsed
//from the given values.
val jsonMapper = getJsonMapper(...)
val jsonStringData = jsonMapper.getValueFromString(importer.getDataObject)
(key, jsonStringData)
}
I fixed this by removing the groupByKey call and implementing a new FileInputFormat as well as a RecordReader to remove my limitations that lines depend on other lines. For now, I implemented it so that each split will contain a 50.000 Byte overhead of the previous split. This will ensure that all lines that depend on previous lines can be parsed correctly.
I will now go ahead and still look through the last 50.000 Bytes of the previous split, but only copy over lines that actually affect the parsing of the current split. Thus, I minimize the overhead and still get a highly parallelizable task.
The following links dragged me into the right direction. Because the topic of FileInputFormat/RecordReader is quite complicated at first sight (it was for me at least), it is good to read through these articles and understand whether this is suitable for your problem or not:
Relevant code parts from the ae.be article just in case the website goes down. The author (@Gurdt) uses this to detect whether a chat message contains an escaped line return (by having the line end with "\") and appends the escaped lines together until an unescaped \n is found. This will allow him to retrieve messages that spans over two or more lines. The code written in Scala:
Usage
FileInputFormat
RecordReader
Note: You might need to implement getCurrentKey, getCurrentValue, close and getProgress in your RecordReader as well.