We have a very standard Spark job which reads log files from s3 and then does some processing over them. Very basic Spark stuff...
val logs = sc.textFile(somePathTos3)
val mappedRows = logs.map(log => OurRowObject.parseLog(log.split("\t")))
val validRows = mappedRows.filter(log => log._1._1 != "ERROR")
...and continue processing
Where OurRowObject.parseLine
takes the raw log line and maps it to some (key, value) pair (e.g. ( (1,2,3,4), (5,6,7) )
that we can then do processing on. Now, if parseLine
encounters a "problem" log (malformed, empty, etc...) it will return some sentinel value (e.g. ( ("ERROR", ...), (...) )
which the filter step then filters out.
Now, what I have been trying to find a way to do is to simply not include the problem row(s) during the map...some way to tell spark "Hey this is an empty/malformed row, skip it and don't include a pair for it", instead of that additional filter step.
I have not yet been able to find a way to do this, and find it very interesting that this functionality does not (AFAICanFind) exist.
Thank you
You could make the parser return an Option[Value] instead of a Value. That way you could use flatMap to map the lines to rows and remove those that were invalid.
In rough lines something like this:
def parseLog(line:String):Option[Array[String]] = {
val splitted = log.split("\t")
if (validate(splitted)) Some(splitted) else None
}
val validRows = logs.flatMap(OurRowObject.parseLog(_))
One approach is to use the one-parameter overload of collect
(instead of map
or flatMap
) and a PartialFunction
. This is a little tricky if the partial function you need isn't completely trivial. In fact yours probably won't be because you need to parse and validate, which I'll model below with two partial functions (although the first one happens to be defined for all inputs).
// this doesn't really need to be a partial function but we'll
// want to compose it with one and end up with a partial function
val split: PartialFunction[String, Array[String]] = {
case log => log.split("\t")
}
// this really needs to be a partial function
val validate: PartialFunction[Array[String], Array[String]] = {
case lines if lines.length > 2 => lines
}
val splitAndValidate = split andThen validate
val logs = sc.parallelize(Seq("a\tb", "u\tv\tw", "a", "x\ty\tz"), 4)
// only accept the logs with more than two entries
val validRows = logs.collect(splitAndValidate)
This is perfectly good Scala but it doesn't work because splitAndValidate
isn't serializable and we're using Spark. (Note that split
and validate
are serializable: the problem lies with composition!) So, we need to make a PartialFunction
that is serializable:
class LogValidator extends PartialFunction[String, Array[String]] with Serializable {
private val validate: PartialFunction[Array[String], Array[String]] = {
case lines if lines.length > 2 => lines
}
override def apply(log: String) : Array[String] = {
validate(log.split("\t"))
}
override def isDefinedAt(log: String) : Boolean = {
validate.isDefinedAt(log.split("\t"))
}
}
Then we can call
val validRows = logs.collect(new LogValidator())