spark partition data writing by timestamp

2020-06-16 06:09发布

问题:

I have some data which has timestamp column field which is long and its epoch standard , I need to save that data in split-ted format like yyyy/mm/dd/hh using spark scala

data.write.partitionBy("timestamp").format("orc").save("mypath") 

this is just splitting the data by timestamp like below

timestamp=1458444061098
timestamp=1458444061198

but I want it to be as

└── YYYY
    └── MM
        └── DD
            └── HH

回答1:

You can leverage various spark sql date/time functions for this. First, you add a new date type column created from the unix timestamp column.

val withDateCol = data
.withColumn("date_col", from_unixtime(col("timestamp", "YYYYMMddHH"))

After this, you can add year, month, day and hour columns to the DF and then partition by these new columns for the write.

withDateCol
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.withColumn("hour", hour(col("date_col")))
.drop("date_col")
.partitionBy("year", "month", "day", "hour")
.format("orc")
.save("mypath") 

The columns included in the partitionBy clause wont be part of the file schema.



回答2:

First, I would caution you with over-partitioning. That is, make sure you have sufficient data to make it worth partitioning by hour otherwise you could end up with lots of partition folders with small files. The second caution I would make is from using a partition hierarchy (year/month/day/hour) since it will require a recursive partition discovery.

Having said that, if you definitely want to partition by hour segments I would suggest truncating your timestamp to the hour into a new column and partitioning by that. Then, Spark will be smart enough to recognize the format as a timestamp when you read it back in and you can actually perform full filtering as needed.

input
  .withColumn("ts_trunc", date_trunc("HOUR", 'timestamp)) // date_trunc added in Spark 2.3.0
  .write
  .partitionBy("ts_trunc")
  .save("/mnt/warehouse/part-test")

spark.read.load("/mnt/warehouse/part-test").where("hour(ts_trunc) = 10")

The other option would to partition by date and hour of day as so:

input
  .withColumn("date", to_date('timestamp))
  .withColumn("hour", hour('timestamp))
  .write
  .partitionBy("date", "hour")
  .save("/mnt/warehouse/part-test")