I have a delicate Spark problem, where i just can't wrap my head around.
We have two RDDs ( coming from Cassandra ). RDD1 contains Actions
and RDD2 contains Historic
data. Both have an id on which they can be matched/joined. But the problem is the two tables have an N:N relation ship. Actions
contains multiple rows with the same id and so does Historic
. Here are some example date from both tables.
Actions
time is actually a timestamp
id | time | valueX
1 | 12:05 | 500
1 | 12:30 | 500
2 | 12:30 | 125
Historic
set_at is actually a timestamp
id | set_at| valueY
1 | 11:00 | 400
1 | 12:15 | 450
2 | 12:20 | 50
2 | 12:25 | 75
How can we join these two tables in a way, that we get a result like this
1 | 100 # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1 | 50 # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450
2 | 50 # 125 - 75 for Actions#3 with time 12:30 because H. was in that time at 75
I can't come up with a good solution that feels right, without making a lot of iterations over huge datasets. I always have to think about making a range from the Historic
set and then somehow check if the Actions
fits in the range e.g (11:00 - 12:15) to make the calculation. But that seems to pretty slow to me. Is there any more efficient way to do that? Seems to me, that this kind of problem could be popular, but i couldn't find any hints on this yet. How would you solve this problem in spark?
My current attempts so far ( in half way done code )
case class Historic(id: String, set_at: Long, valueY: Int)
val historicRDD = sc.cassandraTable[Historic](...)
historicRDD
.map( row => ( row.id, row ) )
.reduceByKey(...)
// transforming to another case which results in something like this; code not finished yet
// (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450)))
// From here we could join with Actions
// And then some .filter maybe to select the right Lists tuple
After a few hours of thinking, trying and failing I came up with this solution. I am not sure if it is any good, but due the lack of other options, this is my solution.
First we expand our
case class Historic
The case class is ready and now we bring it into action
I am totally new to Scala, so please let me know if we could improve this code on some place.
It's an interesting problem. I also spent some time figuring out an approach. This is what I came up with:
Given case classes for
Action(id, time, x)
andHistoric(id, time, y)
In Spark:
Using the data provided above, the report looks like:
(I transformed the time to seconds to have a simplistic timestamp)
I know that this question has been answered but I want to add another solution that worked for me -
your data -
Actions
andHistoric
Write a custom partitioner and use repartitionAndSortWithinPartitions to partition by
id
, but sort bytime
.Traverse through the records per partition.
If it is a
Historical
record, add it to a map, or update the map if it already has that id - keep track of the latestvalueY
perid
using a map per partition.If it is a
Action
record, get thevalueY
value from the map and subtract it fromvalueX
A map
M
Partition-1 traversal in order M={ 1 -> 400} // A new entry in map M 1 | 100 // M(1) = 400; 500-400 M={1 -> 450} // update M, because key already exists 1 | 50 // M(1) Partition-2 traversal in order M={ 2 -> 50} // A new entry in M M={ 2 -> 75} // update M, because key already exists 2 | 50 // M(2) = 75; 125-75
You could try to partition and sort by
time
, but you need to merge the partitions later. And that could add to some complexity.This, I found it preferable to the many-to-many join that we usually get when using time ranges to join.