Huge memory consumption in Map Task in Spark

2019-04-12 07:44发布

问题:

I have a lot of files that contain roughly 60.000.000 lines. All of my files are formatted in the format {timestamp}#{producer}#{messageId}#{data_bytes}\n

I walk through my files one by one and also want to build one output file per input file. Because some of the lines depend on previous lines, I grouped them by their producer. Whenever a line depends on one or more previous lines, their producer is always the same. After grouping up all of the lines, I give them to my Java parser. The parser then will contain all parsed data objects in memory and output it as JSON afterwards.

To visualize how I think my Job is processed, I threw together the following "flow graph". Note that I did not visualize the groupByKey-Shuffeling-Process.

My problems:

  • I expected Spark to split up the files, process the splits with separate tasks and save each task output to a "part"-file.
  • However, my tasks run out of memory and get killed by YARN before they can finish: Container killed by YARN for exceeding memory limits. 7.6 GB of 7.5 GB physical memory used
  • My Parser is throwing all parsed data objects into memory. I can't change the code of the Parser.
  • Please note that my code works for smaller files (for example two files with 600.000 lines each as the input to my Job)

My questions:

  1. How can I make sure that Spark will create a result for every file-split in my map task? (Maybe they will if my tasks succeed but I will never see the output as of now.)
  2. I thought that my map transformation val lineMap = lines.map ... (see Scala code below) produces a partitioned rdd. Thus I expect the values of the rdd to be split in some way before calling my second map task.

    Furthermore, I thought that calling saveAsTextFile on this rdd lineMap will produce a output task that runs after each of my map task has finished. If my assumptions are correct, why do my executors still run out of memory? Is Spark doing several (too) big file splits and processes them concurrently, which leads to the Parser filling up the memory?

  3. Is repartitioning the lineMap rdd to get more (smaller) inputs for my Parser a good idea?
  4. Is there somewhere an additional reducer step which I am not aware of? Like results being aggregated before getting written to file or similar?

Scala code (I left out unrelevant code parts):

def main(args: Array[String]) {
    val inputFilePath = args(0)
    val outputFilePath = args(1)

    val inputFiles = fs.listStatus(new Path(inputFilePath))
    inputFiles.foreach( filename => {
        processData(filename.getPath, ...)
    }) 
}


def processData(filePath: Path, ...) {
    val lines  = sc.textFile(filePath.toString())
    val lineMap = lines.map(line => (line.split(" ")(1), line)).groupByKey()

    val parsedLines = lineMap.map{ case(key, values) => parseLinesByKey(key, values, config) }
    //each output should be saved separately
    parsedLines.saveAsTextFile(outputFilePath.toString() + "/" + filePath.getName)     
}


def parseLinesByKey(key: String, values: Iterable[String], config : Config) = {
    val importer = new LogFileImporter(...)
    importer.parseData(values.toIterator.asJava, ...)

    //importer from now contains all parsed data objects in memory that could be parsed 
    //from the given values.  

    val jsonMapper = getJsonMapper(...)
    val jsonStringData = jsonMapper.getValueFromString(importer.getDataObject)

    (key, jsonStringData)
}

回答1:

I fixed this by removing the groupByKey call and implementing a new FileInputFormat as well as a RecordReader to remove my limitations that lines depend on other lines. For now, I implemented it so that each split will contain a 50.000 Byte overhead of the previous split. This will ensure that all lines that depend on previous lines can be parsed correctly.

I will now go ahead and still look through the last 50.000 Bytes of the previous split, but only copy over lines that actually affect the parsing of the current split. Thus, I minimize the overhead and still get a highly parallelizable task.

The following links dragged me into the right direction. Because the topic of FileInputFormat/RecordReader is quite complicated at first sight (it was for me at least), it is good to read through these articles and understand whether this is suitable for your problem or not:

  • https://hadoopi.wordpress.com/2013/05/27/understand-recordreader-inputsplit/
  • http://www.ae.be/blog-en/ingesting-data-spark-using-custom-hadoop-fileinputformat/

Relevant code parts from the ae.be article just in case the website goes down. The author (@Gurdt) uses this to detect whether a chat message contains an escaped line return (by having the line end with "\") and appends the escaped lines together until an unescaped \n is found. This will allow him to retrieve messages that spans over two or more lines. The code written in Scala:

Usage

val conf = new Configuration(sparkContext.hadoopConfiguration)
val rdd = sparkContext.newAPIHadoopFile("data.txt", classOf[MyFileInputFormat],
classOf[LongWritable], classOf[Text], conf)

FileInputFormat

class MyFileInputFormat extends FileInputFormat[LongWritable, Text] {
    override def createRecordReader(split: InputSplit, context: TaskAttemptContext):
    RecordReader[LongWritable, Text] = new MyRecordReader()
}

RecordReader

class MyRecordReader() extends RecordReader[LongWritable, Text] {
    var start, end, position = 0L
    var reader: LineReader = null
    var key = new LongWritable
    var value = new Text

    override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
        // split position in data (start one byte earlier to detect if
        // the split starts in the middle of a previous record)
        val split = inputSplit.asInstanceOf[FileSplit]
        start = 0.max(split.getStart - 1)
        end = start + split.getLength

        // open a stream to the data, pointing to the start of the split
        val stream = split.getPath.getFileSystem(context.getConfiguration)
        .open(split.getPath)
        stream.seek(start)
        reader = new LineReader(stream, context.getConfiguration)

        // if the split starts at a newline, we want to start yet another byte
        // earlier to check if the newline was escaped or not
        val firstByte = stream.readByte().toInt
        if(firstByte == '\n')
            start = 0.max(start - 1)
        stream.seek(start)

        if(start != 0)
            skipRemainderFromPreviousSplit(reader)
    }

    def skipRemainderFromPreviousSplit(reader: LineReader): Unit = {
        var readAnotherLine = true
        while(readAnotherLine) {
            // read next line
            val buffer = new Text()
            start += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE)
            pos = start

            // detect if delimiter was escaped
            readAnotherLine = buffer.getLength >= 1 && // something was read
            buffer.charAt(buffer.getLength - 1) == '\\' && // newline was escaped
            pos <= end // seek head hasn't passed the split
        }
    }

    override def nextKeyValue(): Boolean = {
        key.set(pos)

        // read newlines until an unescaped newline is read
        var lastNewlineWasEscaped = false
        while (pos < end || lastNewlineWasEscaped) {
            // read next line
            val buffer = new Text
            pos += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE)

            // append newly read data to previous data if necessary
            value = if(lastNewlineWasEscaped) new Text(value + "\n" + buffer) else buffer

            // detect if delimiter was escaped
            lastNewlineWasEscaped = buffer.charAt(buffer.getLength - 1) == '\\'

            // let Spark know that a key-value pair is ready!
            if(!lastNewlineWasEscaped)
                return true
        }

        // end of split reached?
        return false
    }
}

Note: You might need to implement getCurrentKey, getCurrentValue, close and getProgress in your RecordReader as well.