Saving a spark dataframe in multiple parts without

2019-08-02 18:00发布

问题:

I want to query my database and store only the top 1 million records sorted by some column say column1. I have tried two approaches

  1. I load parquet files into a dataframe from the HDFS and apply SQL Query to it and then I save the the complete dataframe(10 million records) as text files on HDFS.

    df = sqlContext.sql("SELECT * FROM table order by column1")
    df.rdd.saveAsTextFile("<path>")
    

    I then read the text files and fetch 1 million records from the text file.

  2. I limit the SQL query to 1 million records.

    df = sqlContext.sql("SELECT * FROM table order by column1 LIMIT 1000000")
    df.rdd.saveAsTextFile("<path>")
    

    But the second approach is much slower. I found that in second case dataframe returned by SQL query(df) contains only 1 partition and thus it is written in a single task. Repartitioning the dataframe improved the performance in second case but it was still slower than first case.

Please can anybody suggest an approach to save the dataframe faster in case 2 or any other approach to achieve the same task

回答1:

Assuming that column1 is numeric one possible solution is to estimate distribution and filter instead of sort and limit. Assuming your data looks like this:

from pyspark.mllib.random import RandomRDDs

df = RandomRDDs.normalRDD(sc, 1000000, 10, 323).map(lambda x: (x, )).toDF(["x"])

and you want to take 100 top records:

n = 1000 # Number of records to take, lets say 1000

you can estimate fraction of records you have to take:

q = 100 * (1 - n / float(df.count()))

estimate respective quantile:

import numpy as np
from pyspark.sql.functions import col


threshold = np.percentile(
    df.sample(False, 0.05).select("x").rdd.flatMap(lambda x: x).collect(),
    [q]
)[0]

result = df.where(col("x") > threshold)
result.write.format(...).save(...)

This doesn't shuffle at all and keeps initial record distribution but doesn't guarantee exact number of records and requires additional action with related network IO.