I am trying to build random forest models by group(School_ID, more than 3 thousands) on a large model input csv file using Spark Scala API. Each of the group contains about 3000-4000 records. The resources I have at disposal are 20-30 aws m3.2xlarge instances.
In R, I can construct models by group and save them to a list like this-
library(dplyr);library(randomForest);
Rf_model <- train %>% group_by(School_ID) %>%
do(school= randomForest(formula=Rf_formula, data=., importance = TRUE))
The list can be stored somewhere and I can call them when I need to use them like below -
save(Rf_model.school,file=paste0(Modelpath,"Rf_model.dat"))
load(file=paste0(Modelpath,"Rf_model.dat"))
pred <- predict(Rf_model.school$school[school_index][[1]], newdata=test)
I was wondering how to do that in Spark, whether or not I need to split the data by group first and how to do it efficiently if it's necessary.
I was able to split up the file by School_ID based on the below code but it seems it creates one individual job to subset for each iteration and takes a long time to finish the jobs. Is there a way to do it in one pass?
model_input.cache()
val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID))
for( i <- 0 to programs.length - 1 ){
bySchoolArray(i).
write.format("com.databricks.spark.csv").
option("header", "true").
save("model_input_bySchool/model_input_"+ schools(i))
}
Source: How can I split a dataframe into dataframes with same column values in SCALA and SPARK
Edit 8/24/2015 I'm trying to convert my dataframe into a format that is accepted by the random forest model. I'm following the instruction on this thread How to create correct data frame for classification in Spark ML
Basically, I create a new variable "label" and store my class in Double. Then I combine all my features using VectorAssembler function and transform my input data as follows-
val assembler = new VectorAssembler().
setInputCols(Array("COL1", "COL2", "COL3")).
setOutputCol("features")
val model_input = assembler.transform(model_input_raw).
select("SCHOOL_ID", "label", "features")
Partial error message(let me know if you need the complete log message) -
scala.MatchError: StringType (of class org.apache.spark.sql.types.StringType$) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:57)
This is resolved after converting all the variables to numeric types.
Edit 8/25/2015 The ml model doesn't accept the label I coded manually so I need to use StringIndexer to go around the problem as indicated here. According to the official documentation, the most frequent label gets 0. It causes inconsistent labels across School_ID. I was wondering if there's a way to create the labels without resetting the order of the values.
val indexer = new StringIndexer().
setInputCol("label_orig").
setOutputCol("label")
Any suggestions or directions would be helpful and feel free to raise any questions. Thanks!