我有一个Spark流工作,其目标是:
- 读一批消息
- 预测变量Y使用预训练ML管道给这些消息
问题是,我希望能够更新由遗嘱执行人所用的模型,无需重新启动应用程序。
简单地说,这里是什么样子:
model = #model initialization
def preprocess(keyValueList):
#do some preprocessing
def predict(preprocessedRDD):
if not preprocessedRDD.isEmpty():
df = #create df from rdd
df = model.transform(df)
#more things to do
stream = KafkaUtils.createDirectStream(ssc, [kafkaTopic], kafkaParams)
stream.mapPartitions(preprocess).foreachRDD(predict)
在这种情况下,模型只是使用。 没有更新。
我已经考虑过几种可能性,但现在我已经越过他们都出来:
- 广播它改变了模型每次(不能更新,只读)
- 阅读从HDFS模型的执行者(它需要SparkContext所以不可能)
任何想法 ?
非常感谢 !