Can I create a model in spark batch and use it on Spark streaming for real-time processing?
I have seen the various examples on Apache Spark site where both training and prediction are built on the same type of processing (linear regression).
Can I create a model in spark batch and use it on Spark streaming for real-time processing?
I have seen the various examples on Apache Spark site where both training and prediction are built on the same type of processing (linear regression).
Can I create a model in spark batch and use it on Spark streaming for real-time processing?
Ofcourse, yes. In spark community they call it offline training online predictions. Many training algorithms in spark allow you to save the model on file system HDFS/S3. Same model can be loaded by a streaming application. You simply call predict method of the model to do predictions.
See the section Streaming + MLLib in this link.
For example, if you want to train a DecisionTree offline and do predictions online...
In batch application -
val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,impurity, maxDepth, maxBins)
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
In streaming application -
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
sameModel.predict(newData)
here is one more solution which I just implemented.
I created a model in spark-Batch. suppose the final model object name is regmodel.
final LinearRegressionModel regmodel =algorithm.run(JavaRDD.toRDD(parsedData));
and spark context name is sc as
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Now in a same code I am creating a spark streaming using the same sc
final JavaStreamingContext jssc = new JavaStreamingContext(sc,new Duration(Integer.parseInt(conf.getWindow().trim())));
and doing prediction like this:
JavaPairDStream<Double, Double> predictvalue = dist1.mapToPair(new PairFunction<LabeledPoint, Double,Double>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Double, Double> call(LabeledPoint v1) throws Exception {
Double p = v1.label();
Double q = regmodel.predict(v1.features());
return new Tuple2<Double, Double>(p,q);
}
});