How to do range lookup and search in PySpark

2019-07-27 01:56发布

问题:

I try to code in PySpark a function which can do combination search and lookup values within a range. The following is the detailed description.

I have two data sets. One data set, say D1, is basically a lookup table, as in below:

MinValue  MaxValue Value1 Value2
---------------------------------
1          1000      0.5     0.6
1001       2000      0.8     0.1
2001       4000      0.2     0.5
4001       9000      0.04    0.06

The other data set, say D2, is a table with millions of records, for example:

ID      InterestsRate       Days       
----------------------------------
1       19.99               29
2       11.99               49

For each ID, I need to calculate the maximum return based on different credit limit with possible values of 500, 1000, 2000, 3000, 5000.

The return is calculated as, for example,

f(x) = InterestsRate * Days * Value1 * Value2.

Value1 and Value2 are determined by looking up a credit limit in D1. For example, if the credit limit is 3000, the looking up D1, 0.2 and 0.5 will be returned.

For each record in D2, I want to calculate the return on different credit limit and find out the credit limit and return which gives me the max return.

So far I have completed two functions:

I defined a lookup function as

def LookUp(value):
    filter_str = "MinValue <=" + str(value) + " and MaxValue >=" + str(value)
    return D1.filter(filter_str)

I also defined a search function as

def Search(rate, day):
    credit_limit = [500, 1000, 2000, 3000, 5000]
    max=0;
    cl=-1;
    for i in range(1: len(credit_limit)):
       v1 = lookup(credit_limit[i]).select("value1")
       v2 = lookup(credit_limit[i]).select("value2")
       tmp = rate*day*value1*value2
       if max < tmp: 
          max=tmp 
          cl=credit_limit[i]

    return (cl, max)  

I will call the following transformation on D2:

res = D2.mapValues(lambda row: Search(row[1], row[2]))

With surprise, I run into errors and I googled that I cannot use data frame (D1) within a transformation on RDD (D2).

I also googled that the possible solution is to broadcast D1. However, I don't know how to make it work.

Would you please comment on how to implement this function in PySpark?

Thanks!

回答1:

When you're using spark you should think in terms of SQL and table joins instead of looping over lists.

So the first thing I would do is to turn your credit limits list into a table, let's call it D3:

credit_limit = [500, 1000, 2000, 3000, 5000]
D3 = spark.createDataFrame([[x] for x in credit_limit], ["CreditLimit"])
D3.show()
#+-----------+
#|CreditLimit|
#+-----------+
#|        500|
#|       1000|
#|       2000|
#|       3000|
#|       5000|
#+-----------+

Now, you can join this table to D1 and D2 to compute the return for each credit limit, and then pick the max return using Window function to rank each return. As you stated in the comments we will pick the maximum credit limit if there is a tie.

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy("ID").orderBy(f.desc("Return"), f.desc("CreditLimit"))
D2.alias("D2").crossJoin(D3.alias("D3"))\
    .crossJoin(D1.alias("D1"))\
    .where("D3.CreditLimit BETWEEN D1.MinValue AND D1.MaxValue")\
    .withColumn("Return", f.expr("D2.InterestsRate*D2.Days*D1.Value1*D1.Value2"))\
    .withColumn("Rank", f.rank().over(w))\
    .where("Rank = 1")\
    .drop("Rank")\
    .show()
#+---+-------------+----+-----------+--------+--------+------+------+------------------+
#| ID|InterestsRate|Days|CreditLimit|MinValue|MaxValue|Value1|Value2|            Return|
#+---+-------------+----+-----------+--------+--------+------+------+------------------+
#|  1|        19.99|  29|       1000|       1|    1000|   0.5|   0.6|173.91299999999998|
#|  2|        11.99|  49|       1000|       1|    1000|   0.5|   0.6|           176.253|
#+---+-------------+----+-----------+--------+--------+------+------+------------------+

We're doing 2 Cartesian products here so this may not scale well, but give it a try.