PySpark + Google Cloud Storage (wholeTextFiles)

2019-04-12 08:04发布

问题:

I am trying to parse about 1 million HTML files using PySpark (Google Dataproc) and write the relevant fields out to a condensed file. Each HTML file is about 200KB. Hence, all the data is about 200GB.

The code below works fine if I use a subset of the data, but runs for hours and then crashes when running on the whole dataset. Furthermore, the worker nodes are not utilized (<5% CPU) so I know there is some issue.

I believe the system is choking on ingesting the data from GCS. Is there a better way to do this? Also, when I use wholeTextFiles in this fashion, does the master attempt to download all the files and then send them to the executors, or does it let the executors download them?

def my_func(keyval):
   keyval = (file_name, file_str)
   return parser(file_str).__dict__

data = sc.wholeTextFiles("gs://data/*")
output = data.map(my_func)
output.saveAsTextFile("gs://results/a")

回答1:

To answer your question the master won't read all of the contained data, but it will fetch status for all input files before beginning work. Dataproc sets the property "mapreduce.input.fileinputformat.list-status.num-threads" to 20 by default to help improve the time of this lookup, but an RPC is still performed per file in GCS.

It seems you've found a case where even adding threads isn't helping very much and is just leading the driver to OOM faster.

Expanding on how to parallelize the read, I have two ideas.

But first, a bit of a warning: neither of these solutions as they are are very robust to directories being included in the glob. You will probably want to guard against directories appearing in the list of files to read.

The first is done with python and the hadoop command line tools (this could also be done with gsutil). The below is an example of how it might look and performs a file listing on workers, reads file content into pairs and finally computes pairs of (file name, file length):

from __future__ import print_function

from pyspark.rdd import RDD
from pyspark import SparkContext

import sys
import subprocess


def hadoop_ls(file_glob):
  lines = subprocess.check_output(["/usr/bin/hadoop", "fs", "-ls", file_glob]).split("\n")
  files = [line.split()[7] for line in lines if len(line) > 0]
  return files

def hadoop_cat(file):
  return subprocess.check_output(["/usr/bin/hadoop", "fs", "-cat", file]).decode("utf-8")

if __name__ == "__main__":
  if len(sys.argv) < 2:
    print("Provide a list of path globs to read.")
    exit(-1)

  sc = SparkContext()
  # This is just for testing. You'll want to generate a list 
  # of prefix globs instead of having a list passed in from the 
  # command line.
  globs = sys.argv[1:]
  # Desired listing partition count
  lpc = 100
  # Desired 'cat' partition count, should be less than total number of files
  cpc = 1000
  files = sc.parallelize(globs).repartition(lpc).flatMap(hadoop_ls)
  files_and_content = files.repartition(cpc).map(lambda f: [f, hadoop_cat(f)])
  files_and_char_count = files_and_content.map(lambda p: [p[0], len(p[1])])
  local = files_and_char_count.collect()
  for pair in local:
    print("File {} had {} chars".format(pair[0], pair[1]))

I would first start with this subprocess solution and play with the partitioning of hadoop_ls and hadoop_cat calls and see if you can get something that is acceptable.

The second solution is more complicated, but will probably yield a pipeline that is more performant by avoiding many, many exec calls.

In this second solution, we'll be compiling a special purpose helper jar, using an initialization action to copy that jar to all workers and finally making use of the helper from our driver.

The final directory structure of our the scala jar project will look something like this:

helper/src/main/scala/com/google/cloud/dataproc/support/PysparkHelper.scala
helper/build.sbt

In our PysparkHelper.scala file we will have a small scala class that functions much as our pure python solution above does. First we will create an RDD of file globs, then an RDD of file names and finally an RDD of file name and file content pairs.

package com.google.cloud.dataproc.support

import collection.JavaConversions._

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext}

import java.util.ArrayList
import java.nio.charset.StandardCharsets

class PysparkHelper extends Serializable {
  def wholeTextFiles(
    context: JavaSparkContext,
    paths: ArrayList[String],
    partitions: Int): JavaPairRDD[String, String] = {

    val globRDD = context.sc.parallelize(paths).repartition(partitions)
    // map globs to file names:
    val filenameRDD = globRDD.flatMap(glob => {
      val path = new Path(glob)
      val fs: FileSystem = path.getFileSystem(new Configuration)
      val statuses = fs.globStatus(path)
      statuses.map(s => s.getPath.toString)
    })
    // Map file name to (name, content) pairs:
    // TODO: Consider adding a second parititon count parameter to repartition before
    // the below map.
    val fileNameContentRDD = filenameRDD.map(f => {
      Pair(f, readPath(f, new Configuration))
    })

    new JavaPairRDD(fileNameContentRDD)
  }

  def readPath(file: String, conf: Configuration) = {
    val path = new Path(file)
    val fs: FileSystem = path.getFileSystem(conf)
    val stream = fs.open(path)
    try {
      IOUtils.toString(stream, StandardCharsets.UTF_8)
    } finally {
      stream.close()
    }
  }
}

The helper/build.sbt file would look something like this:

organization := "com.google.cloud.dataproc.support"
name := "pyspark_support"
version := "0.1"
scalaVersion := "2.10.5"
libraryDependencies +=  "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies +=  "org.apache.hadoop" % "hadoop-common" % "2.7.1" % "provided"
exportJars := true

We can then build the helper with sbt:

$ cd helper && sbt package

The output helper jar should be target/scala-2.10/pyspark_support_2.10-0.1.jar

We now need to get this jar onto our cluster and to do this, we need to do two things: 1) upload the jar to GCS and 2) create an initialization action in GCS to copy the jar to cluster nodes.

For purposes of illustration, let's assume your bucket is named MY_BUCKET (insert appropriate walrus-related meme here).

$ gsutil cp target/scala-2.10/pyspark_support_2.10-0.1.jar gs://MY_BUCKET/pyspark_support.jar

Create an initialization action (let's call it pyspark_init_action.sh, replacing MY_BUCKET as needed):

#!/bin/bash

gsutil cp gs://MY_BUCKET/pyspark_support.jar /usr/lib/hadoop/lib/

and finally upload the initialization action to GCS:

$ gsutil cp pyspark_init_action.sh gs://MY_BUCKET/pyspark_init_action.sh

A cluster can now be started by passing the following flags to gcloud:

--initialization-actions gs://MY_BUCKET/pyspark_init_action.sh

After building, uploading, and installing our new library we can finally make use of it from pyspark:

from __future__ import print_function

from pyspark.rdd import RDD
from pyspark import SparkContext
from pyspark.serializers import PairDeserializer, UTF8Deserializer

import sys

class DataprocUtils(object):

  @staticmethod
  def wholeTextFiles(sc, glob_list, partitions):
    """
    Read whole text file content from GCS.
    :param sc: Spark context
    :param glob_list: List of globs, each glob should be a prefix for part of the dataset.
    :param partitions: number of partitions to use when creating the RDD
    :return: RDD of filename, filecontent pairs.
    """
    helper = sc._jvm.com.google.cloud.dataproc.support.PysparkHelper()
    return RDD(helper.wholeTextFiles(sc._jsc, glob_list, partitions), sc,
               PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))

if __name__ == "__main__":
  if len(sys.argv) < 2:
    print("Provide a list of path globs to read.")
    exit(-1)

  sc = SparkContext()
  globs = sys.argv[1:]
  partitions = 10
  files_and_content = DataprocUtils.wholeTextFiles(sc, globs, partitions)
  files_and_char_count = files_and_content.map(lambda p: (p[0], len(p[1])))
  local = files_and_char_count.collect()
  for pair in local:
    print("File {} had {} chars".format(pair[0], pair[1]))


回答2:

Thanks! I tried the first method. It works, but is not very performant due to the exec calls and RPC/auth overhead. It takes about 10 hours to run on a 32 node cluster. I was able to run it in 30 minutes on a 4-node cluster using databricks on aws with the Amazon s3 connector. It seems there is much less overhead there. I wish Google would provide a better way to ingest data from GCS to Spark.