Efficient column processing in PySpark

2019-08-08 09:11发布

问题:

I have a dataframe with a very large number of columns (>30000).

I'm filling it with 1 and 0 based on the first column like this:

for column in list_of_column_names:
  df = df.withColumn(column, when(array_contains(df['list_column'], column), 1).otherwise(0))

However this process takes a lot of time. Is there a way to do this more efficiently? Something tells me that column processing can be parallelized.

Edit:

Sample input data

+----------------+-----+-----+-----+
|  list_column   | Foo | Bar | Baz |
+----------------+-----+-----+-----+
| ['Foo', 'Bak'] |     |     |     |
| ['Bar', Baz']  |     |     |     |
| ['Foo']        |     |     |     |
+----------------+-----+-----+-----+

回答1:

You might approach like this,

import pyspark.sql.functions as F

exprs = [F.when(F.array_contains(F.col('list_column'), column), 1).otherwise(0).alias(column)\
                  for column in list_column_names]

df = df.select(['list_column']+exprs)


回答2:

There is nothing specifically wrong with your code, other than very wide data:

for column in list_of_column_names:
    df = df.withColumn(...)

only generates the execution plan.

Actual data processing will concurrent and parallelized, once the result is evaluated.

It is however an expensive process as it require O(NMK) operations with N rows, M columns and K values in the list.

Additionally execution plans on very wide data are very expensive to compute (though cost is constant in terms of number of records). If it becomes a limiting factor, you might be better off with RDDs:

  • Sort column array using sort_array function.
  • Convert data to RDD.
  • Apply search for each column using binary search.


回答3:

withColumn is already distributed so a faster approach would be difficult to get other than what you already have. you can try defining a udf function as following

from pyspark.sql import functions as f
from pyspark.sql import types as t

def containsUdf(listColumn):
    row = {}
    for column in list_of_column_names:
        if(column in listColumn):
            row.update({column: 1})
        else:
            row.update({column: 0})
    return row

callContainsUdf = f.udf(containsUdf, t.StructType([t.StructField(x, t.StringType(), True) for x in list_of_column_names]))

df.withColumn('struct', callContainsUdf(df['list_column']))\
    .select(f.col('list_column'), f.col('struct.*'))\
    .show(truncate=False)

which should give you

+-----------+---+---+---+
|list_column|Foo|Bar|Baz|
+-----------+---+---+---+
|[Foo, Bak] |1  |0  |0  |
|[Bar, Baz] |0  |1  |1  |
|[Foo]      |1  |0  |0  |
+-----------+---+---+---+

Note: list_of_column_names = ["Foo","Bar","Baz"]