Loop through dataframe and update the lookup table

2019-08-01 08:17发布

问题:

I have a DataFrame like the following.

+---+-------------+-----+
| id|AccountNumber|scale|
+---+-------------+-----+
|  1|      1500847|    6|
|  2|      1501199|    7|
|  3|      1119024|    3|
+---+-------------+-----+

I have to populate a second DataFrame, which would initially be empty, as follows.

id  AccountNumber   scale
1   1500847         6
2   1501199         6
3   1119024         3

Output explaination

First row in the first DataFrame has a scale of 6. Check for that value minus 1 (so scale equals 5) in the result. There none, so simply add the row (1,1500847,6) to the output.

The second row in the output has a scale of 7. The original table already has a row with scale 7 - 1, so add this row but with that scale (2, 15001199, 6).

The third row works as the first one.

回答1:

Using broadcasted list

You can collect all the scales in scale column as an Array and broadcast it to be used in udf function. Then use the udf function in when logic with withColumn as

import org.apache.spark.sql.functions._
val collectedList = sc.broadcast(df.select(collect_list("scale")).collect()(0)(0).asInstanceOf[collection.mutable.WrappedArray[Int]])

import org.apache.spark.sql.functions._
def newScale = udf((scale: Int)=> collectedList.value.contains(scale))

df.withColumn("scale", when(newScale(col("scale")-1), col("scale")-1).otherwise(col("scale")))
  .show(false)

You should have desired output as

+---+-------------+-----+
|id |AccountNumber|scale|
+---+-------------+-----+
|1  |1500847      |6    |
|2  |1501199      |6    |
|3  |1119024      |3    |
+---+-------------+-----+

Using Window function

The solution I am going to suggest would require you to collect all the data in one executor using Window function to form another column scaleCheck which will be populated with all the scales present in scale column as

import org.apache.spark.sql.expressions.Window
def windowSpec = Window.orderBy("id").rowsBetween(Long.MinValue, Long.MaxValue)
val tempdf = df.withColumn("scaleCheck", collect_list("scale").over(windowSpec))

this would give you dataframe

+---+-------------+-----+----------+
|id |AccountNumber|scale|scaleCheck|
+---+-------------+-----+----------+
|1  |1500847      |6    |[6, 7, 3] |
|2  |1501199      |7    |[6, 7, 3] |
|3  |1119024      |3    |[6, 7, 3] |
+---+-------------+-----+----------+

Then you would have to write a udf function to check whether the scale in the row is already present in the collected list. Then using when function and calling the udf function, you can generate the scale value

import org.apache.spark.sql.functions._
def newScale = udf((scale: Int, scaleCheck: collection.mutable.WrappedArray[Int])=> scaleCheck.contains(scale))

tempdf.withColumn("scale", when(newScale(col("scale")-1, col("scaleCheck")), col("scale")-1).otherwise(col("scale")))
  .drop("scaleCheck")
  .show(false)

So your final required dataframe is achieved which is given above

I hope the answer is helpful