I try to code in PySpark a function which can do combination search and lookup values within a range. The following is the detailed description.
I have two data sets.
One data set, say D1
, is basically a lookup table, as in below:
MinValue MaxValue Value1 Value2
---------------------------------
1 1000 0.5 0.6
1001 2000 0.8 0.1
2001 4000 0.2 0.5
4001 9000 0.04 0.06
The other data set, say D2, is a table with millions of records, for example:
ID InterestsRate Days
----------------------------------
1 19.99 29
2 11.99 49
For each ID
, I need to calculate the maximum return based on different credit limit with possible values of 500, 1000, 2000, 3000, 5000
.
The return is calculated as, for example,
f(x) = InterestsRate * Days * Value1 * Value2.
Value1
and Value2
are determined by looking up a credit limit in D1
. For example, if the credit limit is 3000, the looking up D1
, 0.2 and 0.5 will be returned.
For each record in D2
, I want to calculate the return on different credit limit and find out the credit limit and return which gives me the max return.
So far I have completed two functions:
I defined a lookup function as
def LookUp(value):
filter_str = "MinValue <=" + str(value) + " and MaxValue >=" + str(value)
return D1.filter(filter_str)
I also defined a search function as
def Search(rate, day):
credit_limit = [500, 1000, 2000, 3000, 5000]
max=0;
cl=-1;
for i in range(1: len(credit_limit)):
v1 = lookup(credit_limit[i]).select("value1")
v2 = lookup(credit_limit[i]).select("value2")
tmp = rate*day*value1*value2
if max < tmp:
max=tmp
cl=credit_limit[i]
return (cl, max)
I will call the following transformation on D2:
res = D2.mapValues(lambda row: Search(row[1], row[2]))
With surprise, I run into errors and I googled that I cannot use data frame (D1
) within a transformation on RDD (D2
).
I also googled that the possible solution is to broadcast D1
. However, I don't know how to make it work.
Would you please comment on how to implement this function in PySpark?
Thanks!
When you're using
spark
you should think in terms ofSQL
and table joins instead of looping over lists.So the first thing I would do is to turn your credit limits list into a table, let's call it
D3
:Now, you can join this table to
D1
andD2
to compute the return for each credit limit, and then pick the max return usingWindow
function to rank each return. As you stated in the comments we will pick the maximum credit limit if there is a tie.We're doing 2 Cartesian products here so this may not scale well, but give it a try.