Optimal way to create a ml pipeline in Apache Spar

2019-02-02 03:03发布

问题:

I am working with Spark 2.1.1 on a dataset with ~2000 features and trying to create a basic ML Pipeline, consisting of some Transformers and a Classifier.

Let's assume for the sake of simplicity that the Pipeline I am working with consists of a VectorAssembler, StringIndexer and a Classifier, which would be a fairly common usecase.

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

If the pipeline steps are separated into a transformer pipeline (VectorAssembler + StringIndexer) and a second classifier pipeline, and if the unnecessary columns are dropped in between both pipelines, training succeeds. This means for reusing the models, two PipelineModels have to be saved after training and an intermediary preprocessing step has to be introduced.

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

The (imho) much cleaner solution would be to merge all pipeline stages into one pipeline.

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

However, putting all PipelineStages into one Pipeline leads to the following exception, probably due to the issue this PR will eventually solve:

ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF

The reason for this is that the VectorAssembler effectively doubles (in this example) the amount of data in the DataFrame, as there is no transformer that could drop the unnecessary columns. (See spark pipeline vector assembler drop other columns)

To the example works on the golub dataset and the following preprocessing steps are necessary:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

As I am new to Spark, I am not sure what would be the best way to solve this issue. Would you suggest...

  1. to create a new transformer, which drops columns and that can be incorporated into the pipeline?
  2. split both Pipelines and introduce the intermediary step
  3. anything else? :)

Or am I missing anything important (pipeline steps, PR, etc.) that would solve this issue?


Edit:

I implemented a new Transformer DroppingVectorAssembler, which drops unnecessary columns, however, the same exception is thrown.

Besides that, setting spark.sql.codegen.wholeStage to false does not solve the issue.

回答1:

The janino error is due the number of constant variables created during the optimizer process. The maximum limit of constant variables allowed in the JVM is ((2^16) -1). If this limit is exceeded, then you get the Constant pool for class ... has grown past JVM limit of 0xFFFF

The JIRA that will fix this issue is SPARK-18016, but it's still in progress at this time.

Your code is most likely failing during the VectorAssembler stage, when it has to perform against thousands of columns during a single optimization task.

The workaround that I developed for this problem is to create a "vector of vectors" by working against subsets of the columns and then bringing the results together at the end to create a singular feature vector. This prevents any single optimization task from exceeding the JVM constant limit. It's not elegant, but I've used it on datasets reaching into the 10k columns range.

This method also allows you to still keep a single pipeline, though it requires some additional steps to make it work (creating the sub-vectors). After you've created the feature vector from the sub-vectors, you can drop the original source columns if desired.

Example Code:

// IMPORT DEPENDENCIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}

// Create first example dataframe
val exampleDF = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6),
  (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3),
  (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", 
        "col6", "col7", "col8", "col9", "colA", "colB", 
        "colC", "colD", "colE", "colF", "colG", "colH", 
        "colI", "colJ", "colK")

// Create multiple column lists using the sliding method
val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray

// Create a vector assembler for each column list
val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec")
val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec")
val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec")
val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec")

// Create a vector assembler using column list vectors as input
val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features")

// Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last
val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler))

// Fit and transform the data
val featuresDF = pipeline.fit(exampleDF).transform(exampleDF)

// Get the number of features in "features" vector
val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs"))

// Print number of features in "features vector"
print(featureLength)

(Note: The method of creating the column lists should really be done programatically, but I've kept this example simple for the sake of understanding the concept.)



回答2:

The janino error that you are getting is because depending on the feature set, the generated code becomes larger.

I'd separate the steps into different pipelines and drop the unnecessary features, save the intermediate models like StringIndexer and OneHotEncoder and load them while prediction stage, which is also helpful because transformations would be faster for the data that has to be predicted.

Finally, you don't need to keep the feature columns after you run VectorAssembler stage as it transforms the features into a feature vector and label column and that is all you need to run predictions.

Example of Pipeline in Scala with saving of intermediate steps-(Older spark API)

Also, if you are using older version of spark like 1.6.0, you need to check for patched version i.e. 2.1.1 or 2.2.0 or 1.6.4 or else you would hit the Janino error even with around 400 feature columns.