Spark Clustered By/Bucket by dataset not using mem

2019-05-27 02:14发布

I recently came across Spark bucketby/clusteredby here.

I tried to mimic this for a 1.1TB source file from S3 (already in parquet). Plan is to completely avoid shuffle as most of the datasets are always joined on "id" column. Here are is what I am doing:

myDf.repartition(20)
    .write.partitionBy("day")
    .option("mode", "DROPMALFORMED")
    .option("compression", "snappy")
    .option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite")
.format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")

On a different EMR cluster, I create a table and access it.

CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2  double) USING PARQUET OPTIONS (
path "s3://my-bucket/folder/1year_data_bucketed/"
)
CLUSTERED BY (id) INTO 20 BUCKETS

Create a scala dataframe and join it with another table of same 20 buckets of id column.

val myTableBucketedDf = spark.table("newtable_on_diff_cluster")
val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing")
val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id")
joinedOutput.show()

Here are my questions:

  1. I see that even with repartition, shuffle is still removed in the explain plan, which is good. Is there any issue with using repartition, partition, bucketBy in the above fashion?
  2. The above join is not looking like it is using memory on my EMR cluster from Ganglia. When joining Regular files in parquet format without bucketing, they seem to be running faster in memory for smaller number of day partitions. I havent tested it for more days. How exactly is the join processed here? Is there anyway to avoid the CREATE TABLE sql statement and instead use parquet metadata to define the table schema using scala? I dont want to repeat the column names, data types when they are actually available in parquet.
  3. What is the ideal number of buckets or individual file size after bucket by in terms of available memory on the executor? If the unique number of values in ID column is in ~100 MM range, then if I understand correctly, 20 buckets will divide each bucket as 5MM unique IDs. I understand that the sortBy in here is not respected due to multiple files being produced by Spark for BucketBy. What is the recommendation for repartition/end file sizes/number of buckets in this case.

0条回答
登录 后发表回答