PySpark reduceByKey causes out of memory

2019-04-16 13:57发布

I'm trying to run a job on Yarn mode that processes a large amount of data (2TB) read from google cloud storage. My pipeline works just fine with 10GB of data. The specs of my cluster and the beginning of my pipeline is detailed here : PySpark Yarn Application fails on groupBy

Here is the rest of the pipeline :

      input.groupByKey()\
      [...] processing on sorted groups for each key shard
      .mapPartitions(sendPartition)\
      .map(mergeShardsbyKey)
      .reduceByKey(lambda list1, list2: list1 + list2).take(10)
      [...] output

the map function that is applied over partitions is the following :

def sendPartition(iterator):
    pool = external_service_connection_pool()
    return [make_request(record, pool) for record in iterator]

def make_request(record, pool):
    [...] tags a record based on query results from the external service
    return key, taggedrecord

On the whole dataset the execution fails caused by :

java.lang.OutOfMemoryError: Java heap space

I tried to get a bit more information and I've seen it fails on the reduceByKey, however since the mapPartitions, the task is only executed on one executor until it fails on the reduce (at least only one executor shows on the Spark web interface, and the job is not split into multiple tasks until the reduce)

My question is the following : Why is it only run on 1 executor ? Even though the documentation describing the function seems to fit my idea of mapPartitions (http://spark.apache.org/docs/latest/programming-guide.html),Is this a failure or is it is supposed to work after this groupByKey?

EDIT: I tried on a smaller cluster with a smaller dataset and even though it succeeds, only one executor is used to process all the data after the groupByKey. Moreover there are multiple partitions after each phase and the groupByKey stage is noted as "pending" for each stage after it on the interface when I launch stages one by one.

0条回答
登录 后发表回答