replace strings with ZipWithIndex/ZipWithUniqueID

2019-08-26 19:54发布

问题:

I am trying to replace a certain string to number using ZipWithIndex OR ZipWithUniqueID

lets say I have this format

("u1",("name", "John Sam"))
("u2",("age", "twinty Four"))
("u3",("name", "sam Blake"))

I want this result

(0,(3,4))
(1,(5,6))
(2,(3,8))

what I did is to extract the first element in the key, value pair so I did

val first = file.map(line=> line._1).distinct()
then apply ZipWithIndex
val z1= first.ZipWithIndex()

I got result like this

("u1",0)
("u2",1)
("u3",2)

now I need to take the ids/numbers and change it in my original file. and I need to keep all the distinct ids/numbers in hashTable to be able to look for them later on. is there any way to do that? Any suggestions?

I hope you got my question

回答1:

With

val rdd = spark.sparkContext.parallelize(Seq(
  ("name", "John"), ("age", "twinty"), ("name", "sam")
))

flatten the data:

val flat = rdd.flatMap { case (x, y) => Seq(x, y) }

get unique values:

val unique = flat.distinct

Index and collect as map:

val map = unique.zipWithIndex.collectAsMap

Go back and map:

val indexed = rdd.map { case (x, y) => (map(x), map(y)) }

Enjoy the reuslt

indexed.toLocalIterator.foreach(println)
(2,4)
(3,0)
(2,1)

Edit:

With rewritten questions, replace the first step with:

val flat = rdd.flatMap { case (x, (y, z)) => Seq(x, y, z) }

and the last step with:

val indexed = rdd.map { case (x, (y, z)) => (map(x), (map(y), map(z))) }