I have a DataFrame with the following columns.
scala> show_times.printSchema
root
|-- account: string (nullable = true)
|-- channel: string (nullable = true)
|-- show_name: string (nullable = true)
|-- total_time_watched: integer (nullable = true)
This is data about how many times customer has watched watched a particular show. I'm supposed to categorize the customer for each show based on total time watched.
The dataset has 133 million rows in total with 192 distinct show_names
.
For each individual show I'm supposed to bin the customer into 3 categories (1,2,3).
I use Spark MLlib's QuantileDiscretizer
Currently I loop through every show and run QuantileDiscretizer
in the sequential manner as in the code below.
What I'd like to have in the end is for the following sample input to get the sample output.
Sample Input:
account,channel,show_name,total_time_watched
acct1,ESPN,show1,200
acct2,ESPN,show1,250
acct3,ESPN,show1,800
acct4,ESPN,show1,850
acct5,ESPN,show1,1300
acct6,ESPN,show1,1320
acct1,ESPN,show2,200
acct2,ESPN,show2,250
acct3,ESPN,show2,800
acct4,ESPN,show2,850
acct5,ESPN,show2,1300
acct6,ESPN,show2,1320
Sample Output:
account,channel,show_name,total_time_watched,Time_watched_bin
acct1,ESPN,show1,200,1
acct2,ESPN,show1,250,1
acct3,ESPN,show1,800,2
acct4,ESPN,show1,850,2
acct5,ESPN,show1,1300,3
acct6,ESPN,show1,1320,3
acct1,ESPN,show2,200,1
acct2,ESPN,show2,250,1
acct3,ESPN,show2,800,2
acct4,ESPN,show2,850,2
acct5,ESPN,show2,1300,3
acct6,ESPN,show2,1320,3
Is there a more efficient and distributed way to do it using some groupBy
-like operation instead of looping through each show_name
and bin it one after other?
I know nothing about
QuantileDiscretizer
, but think you're mostly concerned with the dataset to applyQuantileDiscretizer
to. I think you want to figure out how to split your input dataset into smaller datasets pershow_name
(you said that there are 192 distinctshow_name
in the input dataset).Solution 1: Partition Parquet Dataset
I've noticed that you use parquet as the input format. My understanding of the format is very limited but I've noticed that people are using some partitioning scheme to split large datasets into smaller chunks that they could then process whatever they like (per some partitioning scheme).
In your case the partitioning scheme could include
show_name
.That would make your case trivial as the splitting were done at writing time (aka not my problem anymore).
See How to save a partitioned parquet file in Spark 2.1?
Solution 2: Scala's Future
Given your iterative solution, you could wrap every iteration into a Future that you'd submit to process in parallel.
Spark SQL's SparkSession (and Spark Core's SparkContext) are thread-safe.
Solution 3: Dataset's
filter
andunion
operatorsI would think twice before following this solution since it puts burden on your shoulders which I think could easily be sorted out by solution 1.
Given you've got one large 133-million-row parquet file, I'd first build the 192 datasets per
show_name
usingfilter
operator (as you did to buildshow_rdd
which is against the name as it's aDataFrame
notRDD
) andunion
(again as you did).See Dataset API.
Solution 4: Use Window Functions
That's something I think could work, but didn't check it out myself.
You could use window functions (see WindowSpec and Column's over operator).
Window functions would give you partitioning (windows) while
over
would somehow applyQuantileDiscretizer
to a window/partition. That would however require "destructuring"QuantileDiscretizer
into anEstimator
to train a model and somehow fit the result model to the window again.I think it's doable, but haven't done it myself. Sorry.