I have a DataFrame whose data I am pasting below:
+---------------+--------------+----------+------------+----------+
|name | DateTime| Seq|sessionCount|row_number|
+---------------+--------------+----------+------------+----------+
| abc| 1521572913344| 17| 5| 1|
| xyz| 1521572916109| 17| 5| 2|
| rafa| 1521572916118| 17| 5| 3|
| {}| 1521572916129| 17| 5| 4|
| experience| 1521572917816| 17| 5| 5|
+---------------+--------------+----------+------------+----------+
The column 'name'
is of type string. I want a new column "effective_name"
which will contain the incremental values of "name"
like shown below:
+---------------+--------------+----------+------------+----------+-------------------------+
|name | DateTime |sessionSeq|sessionCount|row_number |effective_name|
+---------------+--------------+----------+------------+----------+-------------------------+
|abc |1521572913344 |17 |5 |1 |abc |
|xyz |1521572916109 |17 |5 |2 |abcxyz |
|rafa |1521572916118 |17 |5 |3 |abcxyzrafa |
|{} |1521572916129 |17 |5 |4 |abcxyzrafa{} |
|experience |1521572917816 |17 |5 |5 |abcxyzrafa{}experience |
+---------------+--------------+----------+------------+----------+-------------------------+
The new column contains the incremental concatenation of its previous values of the name
column.
Solution:
import pyspark.sql.functions as f
w = Window.partitionBy("Seq").orderBy("DateTime")
df.select( "*", f.concat_ws( "", f.collect_set(f.col("name")).over(w) ).alias("cummuliative_name") ).show()
Explanation
collect_set() - This function returns value like [["abc","xyz","rafa",{},"experience"]] .
concat_ws() - This function takes the output of collect_set() as input and converts it into abc, xyz, rafa, {}, experience
Note: Use collect_set() if you don't have duplicates or else use collect_list()
You can achieve this by using a
pyspark.sql.Window
, which orders by theclientDateTime
,pyspark.sql.functions.concat_ws
, andpyspark.sql.functions.collect_list
:I dropped
"Seq"
,"sessionCount"
,"row_number"
to make the output display friendlier.If you needed to do this per group, you can add a
partitionBy
to theWindow
. Say in this case you want to group bysessionSeq
, you can do the following:If you prefer to use
withColumn
, the above is equivalent to:Explanation
You want to apply a function over multiple rows, which is called an aggregation. With any aggregation, you need to define which rows to aggregate over and the order. We do this using a
Window
. In this case,w = Window.partitionBy("Seq").orderBy("DateTime")
will partition the data by theSeq
and sort by theDateTime
.We first apply the aggregate function
collect_list("name")
over the window. This gathers all of the values from thename
column and puts them in a list. The order of insertion is defined by the Window's order.For example, the intermediate output of this step would be:
Now that the appropriate values are in the list, we can concatenate them together with an empty string as the separator.