Add a new calculated column from 2 values in RDD

2019-07-22 20:08发布

问题:

I have 2 paired RDDs that I joined them together using the same key and I now I want to add a new calculated column using 2 columns from the values part. The new joined RDD type is:

RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])]

I want to add another field to the new RDD which show the delta between the 2 DateTime fields.

How can I do this?

回答1:

You should be able to do this using map to extend the 2-tuples into 3-tuples, roughly as follows:

joined.map{ case (key, values) =>
  val delta = computeDelta(values)
  (key, values, delta)
}

Or, more concisely:

joined.map{ case (k, vs) => (k, vs, computeDelta(vs)) }

Then your computeDelta function can just extract the first and second values of type (String, DateTime, Int,Int), get the second item (DateTime) from each and compute the delta using whatever DateTime functions are convenient.

If you want your output RDD to still be a paired RDD, then you will need to wrap the new delta field into a tuple, roughly as follows:

joined.mapValues{ values =>
  val delta = computeDelta(values)
  (values, delta)
}

which will preserve the original PairedRDD keys, and give you values of type (Iterable[(String, DateTime, Int,Int)], Long)

(assuming you are calculating deltas of type Long)