I am trying to use the MaxMind GeoIP API for scala-spark which is found https://github.com/snowplow/scala-maxmind-iplookups. I load in the file using standard:
val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)
I have a basic csv file which I load in that contains time and IP adresses:
val sweek1 = week1.map{line=> IP(parse(line))}.collect{
case Some(ip) => {
val ipadress = ipdetect(ip.ip)
(ip.time, ipadress)
}
}
The function ipdetect is basically defined by:
def ipdetect(a:String)={
ipLookups.performLookups(a)._1 match{
case Some(value) => value.toString
case _ => "Unknown"
}
}
When I run this program, it prompt that "Task not serializable". So I read a few posts and there seem to be a few ways around this.
1, a wrapper 2, using SparkContext.addFile (which distribute file across cluster)
but I cannot work out how either one of them works, I tried the wrapper, but I don't know how and where to call it. I tried addFile, but it returns a Unit instead of String, which I assume you will need to somehow pipe the Binary file. So I am not sure about what to do now. Any help is much appreciated
So I have been able to somewhat serialize it by using mapPartitions and iterate over each local partition, but I wonder if there is a more efficient way to do this as I have dataset in the range of millions
Assume that your csv file contains an IP address per line, and for example, you want to map each ip address to a city.
For other ip transformation, please refer to Scala MaxMind IP Lookups. Furthermore,
mapWith
seems to be deprecated. UsemapPartitionsWithIndex
instead.