I try to code in PySpark a function which can do combination search and lookup values within a range. The following is the detailed description.
I have two data sets.
One data set, say D1
, is basically a lookup table, as in below:
MinValue MaxValue Value1 Value2
---------------------------------
1 1000 0.5 0.6
1001 2000 0.8 0.1
2001 4000 0.2 0.5
4001 9000 0.04 0.06
The other data set, say D2, is a table with millions of records, for example:
ID InterestsRate Days
----------------------------------
1 19.99 29
2 11.99 49
For each ID
, I need to calculate the maximum return based on different credit limit with possible values of 500, 1000, 2000, 3000, 5000
.
The return is calculated as, for example,
f(x) = InterestsRate * Days * Value1 * Value2.
Value1
and Value2
are determined by looking up a credit limit in D1
. For example, if the credit limit is 3000, the looking up D1
, 0.2 and 0.5 will be returned.
For each record in D2
, I want to calculate the return on different credit limit and find out the credit limit and return which gives me the max return.
So far I have completed two functions:
I defined a lookup function as
def LookUp(value):
filter_str = "MinValue <=" + str(value) + " and MaxValue >=" + str(value)
return D1.filter(filter_str)
I also defined a search function as
def Search(rate, day):
credit_limit = [500, 1000, 2000, 3000, 5000]
max=0;
cl=-1;
for i in range(1: len(credit_limit)):
v1 = lookup(credit_limit[i]).select("value1")
v2 = lookup(credit_limit[i]).select("value2")
tmp = rate*day*value1*value2
if max < tmp:
max=tmp
cl=credit_limit[i]
return (cl, max)
I will call the following transformation on D2:
res = D2.mapValues(lambda row: Search(row[1], row[2]))
With surprise, I run into errors and I googled that I cannot use data frame (D1
) within a transformation on RDD (D2
).
I also googled that the possible solution is to broadcast D1
. However, I don't know how to make it work.
Would you please comment on how to implement this function in PySpark?
Thanks!