I currently try to create some kind of monitoring solution - some data is written to kafka and I read this data with Spark Streaming and process it.
For preprocessing the data for machine learning and anomaly detection I would like to split the stream based on some filter Parameters. So far I have learned that DStreams themselves cannot be split into several streams.
The problem I am mainly facing is that many algorithms(like KMeans) only take continues data and not discrete data like e.g. url or some other String.
My requirements would ideally be:
- Read data from kafka and generate a list of strings based on what I read
- Generate multiple streams based on that list of strings - (split stream, filter stream, or whatever is best practice)
- Use those streams to train different models for each Stream to get a baseline and afterwards compare everything that comes later against the baseline
I would be happy to get any suggestions how to approach my problem. I cannot imagine that this scenario is not covered in Spark - however until now I did not discover a working solution.
I think it should be sufficient to create derivate DStreams from the original, using filter and map:
val numericFeaturesDStream = originalDStream.filter(e => predicate(e)).map(e => extractFeatures(e))
val otherNumericFeaturesDStream = originalDStream.filter(e => predicate2(e)).map(e => extractOtherFeatures(e))
Note that these filter
and map
steps could be combined in one collect
step (not to be confused with the parameterless RDD.collect that takes data to the driver!!!)
val featuresStream = originalDStream.transform(rdd =>
rdd.collect{case (a,b,c,d,e) if c=="client" => Vectors.parse(a)}
)
streamingKMeans.trainOn(featuresStream)
We can also have a dynamic set of filtered DStreams held into some collection. Here we use a map that contains the key we used to filter:
originalDStream.cache() // important for performance when a DStream is branched out.
// filterKeys: Set[String]
val dstreamByFilterKey = filterKeys.map(key => key -> originalDStream.filter(e => (getKey(e)==key)))
// do something with the different DStreams in this structure ...
These snippets are code examples to be completed with the actual logic.