Run 3000+ Random Forest Models By Group Using Spar

2020-02-24 02:04发布

问题:

I am trying to build random forest models by group(School_ID, more than 3 thousands) on a large model input csv file using Spark Scala API. Each of the group contains about 3000-4000 records. The resources I have at disposal are 20-30 aws m3.2xlarge instances.

In R, I can construct models by group and save them to a list like this-

library(dplyr);library(randomForest);
    Rf_model <- train %>% group_by(School_ID) %>% 
                do(school= randomForest(formula=Rf_formula, data=., importance = TRUE))

The list can be stored somewhere and I can call them when I need to use them like below -

save(Rf_model.school,file=paste0(Modelpath,"Rf_model.dat"))
load(file=paste0(Modelpath,"Rf_model.dat"))
pred <-  predict(Rf_model.school$school[school_index][[1]], newdata=test)

I was wondering how to do that in Spark, whether or not I need to split the data by group first and how to do it efficiently if it's necessary.

I was able to split up the file by School_ID based on the below code but it seems it creates one individual job to subset for each iteration and takes a long time to finish the jobs. Is there a way to do it in one pass?

model_input.cache()

val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID))

for( i <- 0 to programs.length - 1 ){
  bySchoolArray(i).
    write.format("com.databricks.spark.csv").
    option("header", "true").
    save("model_input_bySchool/model_input_"+ schools(i))
}

Source: How can I split a dataframe into dataframes with same column values in SCALA and SPARK

Edit 8/24/2015 I'm trying to convert my dataframe into a format that is accepted by the random forest model. I'm following the instruction on this thread How to create correct data frame for classification in Spark ML

Basically, I create a new variable "label" and store my class in Double. Then I combine all my features using VectorAssembler function and transform my input data as follows-

val assembler = new VectorAssembler().
  setInputCols(Array("COL1", "COL2", "COL3")).
  setOutputCol("features")

val model_input = assembler.transform(model_input_raw).
  select("SCHOOL_ID", "label", "features")

Partial error message(let me know if you need the complete log message) -

scala.MatchError: StringType (of class org.apache.spark.sql.types.StringType$) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:57)

This is resolved after converting all the variables to numeric types.

Edit 8/25/2015 The ml model doesn't accept the label I coded manually so I need to use StringIndexer to go around the problem as indicated here. According to the official documentation, the most frequent label gets 0. It causes inconsistent labels across School_ID. I was wondering if there's a way to create the labels without resetting the order of the values.

val indexer = new StringIndexer().
  setInputCol("label_orig").
  setOutputCol("label")

Any suggestions or directions would be helpful and feel free to raise any questions. Thanks!

回答1:

Since you already have separate data frame for each school there is not much to be done here. Since you data frames I assume you want to use ml.classification.RandomForestClassifier. If so you can try something like this:

  1. Extract pipeline logic. Adjust RandomForestClassifier parameters and transformers according to your requirements

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.ml.classification.RandomForestClassifier
    import org.apache.spark.ml.{Pipeline, PipelineModel}
    
    def trainModel(df: DataFrame): PipelineModel = {
       val rf  = new RandomForestClassifier()
       val pipeline = new Pipeline().setStages(Array(rf))
       pipeline.fit(df)
    }
    
  2. Train models on each subset

    val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df))
    
  3. Save models

    import java.io._
    
    def saveModel(name: String, model: PipelineModel) = {
      val oos = new ObjectOutputStream(new FileOutputStream(s"/some/path/$name"))
      oos.writeObject(model)
      oos.close
    }
    
    schools.zip(bySchoolArrayModels).foreach{
      case (name, model) => saveModel(name, Model)
    }
    
  4. Optional: Since individual subsets are rather small you can try an approach similar to the one I've describe here to submit multiple tasks at the same time.

  5. If you use mllib.tree.model.RandomForestModel you can omit 3. and use model.save directly. Since there seem to be some problems with deserialization (How to deserialize Pipeline model in spark.ml? - as far as I can tell it works just fine but better safe than sorry, I guess) it could be a preferred approach.

Edit

According to the official documentation:

VectorAssembler accepts the following input column types: all numeric types, boolean type, and vector type.

Since error indicates your column is a String you should transform it first, for example using StringIndexer.