Spark Streaming - Best way to Split Input Stream b

2019-05-22 21:20发布

问题:

I currently try to create some kind of monitoring solution - some data is written to kafka and I read this data with Spark Streaming and process it.

For preprocessing the data for machine learning and anomaly detection I would like to split the stream based on some filter Parameters. So far I have learned that DStreams themselves cannot be split into several streams.

The problem I am mainly facing is that many algorithms(like KMeans) only take continues data and not discrete data like e.g. url or some other String.

My requirements would ideally be:

  • Read data from kafka and generate a list of strings based on what I read
  • Generate multiple streams based on that list of strings - (split stream, filter stream, or whatever is best practice)
  • Use those streams to train different models for each Stream to get a baseline and afterwards compare everything that comes later against the baseline

I would be happy to get any suggestions how to approach my problem. I cannot imagine that this scenario is not covered in Spark - however until now I did not discover a working solution.

回答1:

I think it should be sufficient to create derivate DStreams from the original, using filter and map:

val numericFeaturesDStream = originalDStream.filter(e => predicate(e)).map(e => extractFeatures(e))
val otherNumericFeaturesDStream = originalDStream.filter(e => predicate2(e)).map(e => extractOtherFeatures(e))

Note that these filter and map steps could be combined in one collect step (not to be confused with the parameterless RDD.collect that takes data to the driver!!!)

val featuresStream = originalDStream.transform(rdd => 
  rdd.collect{case (a,b,c,d,e) if c=="client" => Vectors.parse(a)}
)
streamingKMeans.trainOn(featuresStream)

We can also have a dynamic set of filtered DStreams held into some collection. Here we use a map that contains the key we used to filter:

originalDStream.cache() // important for performance when a DStream is branched out.
// filterKeys: Set[String] 
val dstreamByFilterKey = filterKeys.map(key => key -> originalDStream.filter(e => (getKey(e)==key)))
// do something with the different DStreams in this structure ...

These snippets are code examples to be completed with the actual logic.