How to add strings of one columns of the dataframe

2020-03-30 06:33发布

I have a DataFrame whose data I am pasting below:

+---------------+--------------+----------+------------+----------+
|name           |      DateTime|       Seq|sessionCount|row_number|
+---------------+--------------+----------+------------+----------+
|            abc| 1521572913344|        17|           5|         1|
|            xyz| 1521572916109|        17|           5|         2|
|           rafa| 1521572916118|        17|           5|         3|
|             {}| 1521572916129|        17|           5|         4|
|     experience| 1521572917816|        17|           5|         5|
+---------------+--------------+----------+------------+----------+

The column 'name' is of type string. I want a new column "effective_name" which will contain the incremental values of "name" like shown below:

+---------------+--------------+----------+------------+----------+-------------------------+
|name          | DateTime |sessionSeq|sessionCount|row_number |effective_name|
+---------------+--------------+----------+------------+----------+-------------------------+
|abc            |1521572913344 |17        |5           |1         |abc                      |
|xyz            |1521572916109 |17        |5           |2         |abcxyz                   |
|rafa           |1521572916118 |17        |5           |3         |abcxyzrafa               |
|{}             |1521572916129 |17        |5           |4         |abcxyzrafa{}             |
|experience     |1521572917816 |17        |5           |5         |abcxyzrafa{}experience   |
+---------------+--------------+----------+------------+----------+-------------------------+

The new column contains the incremental concatenation of its previous values of the name column.

2条回答
▲ chillily
2楼-- · 2020-03-30 06:55

Solution:

import pyspark.sql.functions as f

w = Window.partitionBy("Seq").orderBy("DateTime")

df.select( "*", f.concat_ws( "", f.collect_set(f.col("name")).over(w) ).alias("cummuliative_name") ).show()

Explanation

collect_set() - This function returns value like [["abc","xyz","rafa",{},"experience"]] .

concat_ws() - This function takes the output of collect_set() as input and converts it into abc, xyz, rafa, {}, experience

Note: Use collect_set() if you don't have duplicates or else use collect_list()

查看更多
孤傲高冷的网名
3楼-- · 2020-03-30 07:09

You can achieve this by using a pyspark.sql.Window, which orders by the clientDateTime, pyspark.sql.functions.concat_ws, and pyspark.sql.functions.collect_list:

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.orderBy("DateTime")  # define Window for ordering

df.drop("Seq", "sessionCount", "row_number").select(
    "*",
    f.concat_ws(
        "",
        f.collect_list(f.col("name")).over(w)
    ).alias("effective_name")
).show(truncate=False)
#+---------------+--------------+-------------------------+
#|name           |      DateTime|effective_name           |
#+---------------+--------------+-------------------------+
#|abc            |1521572913344 |abc                      |
#|xyz            |1521572916109 |abcxyz                   |
#|rafa           |1521572916118 |abcxyzrafa               |
#|{}             |1521572916129 |abcxyzrafa{}             |
#|experience     |1521572917816 |abcxyzrafa{}experience   |
#+---------------+--------------+-------------------------+

I dropped "Seq", "sessionCount", "row_number" to make the output display friendlier.

If you needed to do this per group, you can add a partitionBy to the Window. Say in this case you want to group by sessionSeq, you can do the following:

w = Window.partitionBy("Seq").orderBy("DateTime")

df.drop("sessionCount", "row_number").select(
    "*",
    f.concat_ws(
        "",
        f.collect_list(f.col("name")).over(w)
    ).alias("effective_name")
).show(truncate=False)
#+---------------+--------------+----------+-------------------------+
#|name           |      DateTime|sessionSeq|effective_name           |
#+---------------+--------------+----------+-------------------------+
#|abc            |1521572913344 |17        |abc                      |
#|xyz            |1521572916109 |17        |abcxyz                   |
#|rafa           |1521572916118 |17        |abcxyzrafa               |
#|{}             |1521572916129 |17        |abcxyzrafa{}             |
#|experience     |1521572917816 |17        |abcxyzrafa{}experience   |
#+---------------+--------------+----------+-------------------------+

If you prefer to use withColumn, the above is equivalent to:

df.drop("sessionCount", "row_number").withColumn(
    "effective_name",
    f.concat_ws(
        "",
        f.collect_list(f.col("name")).over(w)
    )
).show(truncate=False)

Explanation

You want to apply a function over multiple rows, which is called an aggregation. With any aggregation, you need to define which rows to aggregate over and the order. We do this using a Window. In this case, w = Window.partitionBy("Seq").orderBy("DateTime") will partition the data by the Seq and sort by the DateTime.

We first apply the aggregate function collect_list("name") over the window. This gathers all of the values from the name column and puts them in a list. The order of insertion is defined by the Window's order.

For example, the intermediate output of this step would be:

df.select(
    f.collect_list("name").over(w).alias("collected")
).show()
#+--------------------------------+
#|collected                       |
#+--------------------------------+
#|[abc]                           |
#|[abc, xyz]                      |
#|[abc, xyz, rafa]                |
#|[abc, xyz, rafa, {}]            |
#|[abc, xyz, rafa, {}, experience]|
#+--------------------------------+

Now that the appropriate values are in the list, we can concatenate them together with an empty string as the separator.

df.select(
    f.concat_ws(
        "",
        f.collect_list("name").over(w)
    ).alias("concatenated")
).show()
#+-----------------------+
#|concatenated           |
#+-----------------------+
#|abc                    |
#|abcxyz                 |
#|abcxyzrafa             |
#|abcxyzrafa{}           |
#|abcxyzrafa{}experience |
#+-----------------------+
查看更多
登录 后发表回答