I have a DataFrame
like the following.
+---+-------------+-----+
| id|AccountNumber|scale|
+---+-------------+-----+
| 1| 1500847| 6|
| 2| 1501199| 7|
| 3| 1119024| 3|
+---+-------------+-----+
I have to populate a second DataFrame
, which would initially be empty, as follows.
id AccountNumber scale
1 1500847 6
2 1501199 6
3 1119024 3
Output explaination
First row in the first DataFrame
has a scale
of 6. Check for that value minus 1 (so scale
equals 5) in the result. There none, so simply add the row (1,1500847,6)
to the output.
The second row in the output has a scale
of 7. The original table already has a row with scale
7 - 1, so add this row but with that scale (2, 15001199, 6)
.
The third row works as the first one.
Using broadcasted list
You can collect all the scales in scale
column as an Array and broadcast
it to be used in udf
function. Then use the udf
function in when
logic with withColumn
as
import org.apache.spark.sql.functions._
val collectedList = sc.broadcast(df.select(collect_list("scale")).collect()(0)(0).asInstanceOf[collection.mutable.WrappedArray[Int]])
import org.apache.spark.sql.functions._
def newScale = udf((scale: Int)=> collectedList.value.contains(scale))
df.withColumn("scale", when(newScale(col("scale")-1), col("scale")-1).otherwise(col("scale")))
.show(false)
You should have desired output as
+---+-------------+-----+
|id |AccountNumber|scale|
+---+-------------+-----+
|1 |1500847 |6 |
|2 |1501199 |6 |
|3 |1119024 |3 |
+---+-------------+-----+
Using Window function
The solution I am going to suggest would require you to collect all the data in one executor using Window
function to form another column scaleCheck
which will be populated with all the scales present in scale
column as
import org.apache.spark.sql.expressions.Window
def windowSpec = Window.orderBy("id").rowsBetween(Long.MinValue, Long.MaxValue)
val tempdf = df.withColumn("scaleCheck", collect_list("scale").over(windowSpec))
this would give you dataframe
+---+-------------+-----+----------+
|id |AccountNumber|scale|scaleCheck|
+---+-------------+-----+----------+
|1 |1500847 |6 |[6, 7, 3] |
|2 |1501199 |7 |[6, 7, 3] |
|3 |1119024 |3 |[6, 7, 3] |
+---+-------------+-----+----------+
Then you would have to write a udf
function to check whether the scale in the row is already present in the collected list. Then using when
function and calling the udf
function, you can generate the scale
value
import org.apache.spark.sql.functions._
def newScale = udf((scale: Int, scaleCheck: collection.mutable.WrappedArray[Int])=> scaleCheck.contains(scale))
tempdf.withColumn("scale", when(newScale(col("scale")-1, col("scaleCheck")), col("scale")-1).otherwise(col("scale")))
.drop("scaleCheck")
.show(false)
So your final required dataframe
is achieved which is given above
I hope the answer is helpful