I have two .CSV files, one including IP addresses :
76.83.179.64
76.83.179.64
187.42.62.209
89.142.219.5
and the other includes IP ranges and country names as follow:
ip_from| ip_to| country_name|
|16777216|16777471|Australia|
What I have done so far is as follow:
load the data : ip_from, ip_to and country name
val rdd1 = sqlContext.read.format("csv").option("inferSchema",
"true").load("/FileStore/tables/locations.CSV")
val df2 = rdd1.toDF()
load the data and converts from IP to Long
val rdd2 = sc.textFile("/FileStore/tables/ipaddress.csv")
def ipToLong(ipAddress: String): Long = {
ipAddress.split("\\.").reverse.zipWithIndex
.map(a=>a._1.toInt*math.pow(256,a._2).toLong).sum
}
val df1 = rdd2.map(x=>ipToLong(x)).toDF()
Now, what user defined function should I write to join both DFs(or look up) and retrieve the country name according the ip address?
for your case you simply use following logic
df1.join(df2, df1("value") >= df2("ip_from") && df1("value") <= df2("ip_to"), "left")
You can use left_outer
join along with a UDF
that does the ip-to-long conversion like in the following example:
val dfIP = Seq(
("76.83.179.64"),
("76.83.179.64"),
("187.42.62.209"),
("89.142.219.5")
).toDF("ip")
val dfRange = Seq(
(1000000000L, 1500000000L, "Country A"),
(1500000000L, 3000000000L, "Country B"),
(3000000000L, 4000000000L, "Country C")
).toDF("ip_from", "ip_to", "country_name")
def ipToLong = udf(
(ip: String) =>
ip.split("\\.").reverse.zipWithIndex.map(
a => a._1.toInt * math.pow(256,a._2).toLong
).sum
)
val dfJoined = dfIP.join(
dfIPRange,
ipToLong($"ip") >= $"ip_from" && ipToLong($"ip") < $"ip_to",
"left_outer"
)
dfJoined.show
+-------------+----------+----------+------------+
| ip| ip_from| ip_to|country_name|
+-------------+----------+----------+------------+
| 76.83.179.64|1000000000|1500000000| Country A|
| 76.83.179.64|1000000000|1500000000| Country A|
|187.42.62.209|3000000000|4000000000| Country C|
| 89.142.219.5|1500000000|3000000000| Country B|
+-------------+----------+----------+------------+