Memory issue with spark structured streaming

2019-03-29 23:11发布

问题:

I'm facing memory issues running structured stream with aggregation and partitioning in Spark 2.2.0:

session
    .readStream()
    .schema(inputSchema)
    .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
    .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
    .csv("s3://test-bucket/input")
    .as(Encoders.bean(TestRecord.class))
    .flatMap(mf, Encoders.bean(TestRecord.class))
    .dropDuplicates("testId", "testName")
    .withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY"))
    .writeStream()
    .option("path", "s3://test-bucket/output")
    .option("checkpointLocation", "s3://test-bucket/checkpoint")
    .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
    .partitionBy("year")
    .format("parquet")
    .outputMode(OutputMode.Append())
    .queryName("test-stream")
    .start();

During testing I noticed that amount of used memory increases each time when new data comes and finally executors exit with code 137:

ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal

I've created a heap dump and found that most of the memory used by org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider that is referenced from StateStore

On the first glance it looks normal since that is how Spark keeps aggregation keys in memory. However I did my testing by renaming files in source folder, so that they could be picked up by spark. Since input records are the same all further rows should be rejected as duplicates and memory consumption shouldn't increase but it is.

Moreover, GC time took more than 30% of total processing time

Here is a heap dump taken from the executor running with smaller amount of memory than on screens above since when I was creating a dump from that one the java process just terminated in the middle of the process.

回答1:

Migrating my comment on SPARK-23682 which asker of this question also filed in issue.

In HDFS state store provider, it excessively caches the multiple versions of states in memory, default 100 versions. The issue is addressed by SPARK-24717, and it will only maintain two versions (current for replay, and new for update) of state in memory. The patch will be available in Spark 2.4.0.



回答2:

I think the root reason is that you do not use a watermark along with dropDuplicates, thus all the states are kept and never dropped. Have a look at: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication