I have two RDDs say
rdd1 =
id | created | destroyed | price
1 | 1 | 2 | 10
2 | 1 | 5 | 11
3 | 2 | 3 | 11
4 | 3 | 4 | 12
5 | 3 | 5 | 11
rdd2 =
[1,2,3,4,5] # lets call these value as timestamps (ts)
rdd2 is basically generated using range(intial_value, end_value, interval). The params here can vary. The size can be same or different to rdd1. The idea is to fetch records from rdd1 into rdd2 based on the values of rdd2 using a filtering criertia(records from rdd1 can repeat while fetching as you can see in output)
filtering criteria rdd1.created <= ts < rdd1.destroyed)
Expected output:
ts | prices
1 | 10,11 # i.e. for ids 1,2 of rdd1
2 | 11,11 # ids 2,3
3 | 11,12,11 # ids 2,4,5
4 | 11,11 # ids 2,5
Now I want to filter RDD1 based on some condition which uses the keys of RDD2. (described above) And returns the results which joins the keys of RDD2 and filtered results of RDD1
So I do:
rdd2.map(lambda x : somefilterfunction(x, rdd1))
def somefilterfunction(x, rdd1):
filtered_rdd1 = rdd1.filter(rdd1[1] <= x).filter(rdd1[2] > x)
prices = filtered_rdd1.map(lambda x : x[3])
res = prices.collect()
return (x, list(res))
And I get:
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
I tried using groupBy , but since here elements of rdd1 can be repeated again and again as compared to grouping which I understand would club each element of rdd1 in some particular slot just once.
The only way is now to use a normal for loop and do the filtering and join everything in the end.
Any Suggestions?