PySpark: withColumn() with two conditions and thre

2020-02-07 18:16发布

问题:

I am working with Spark and PySpark. I am trying to achieve the result equivalent to the following pseudocode:

df = df.withColumn('new_column', 
    IF fruit1 == fruit2 THEN 1, ELSE 0. IF fruit1 IS NULL OR fruit2 IS NULL 3.)

I am trying to do this in PySpark but I'm not sure about the syntax. Any pointers? I looked into expr() but couldn't get it to work.

Note that df is a pyspark.sql.dataframe.DataFrame.

回答1:

There are a few efficient ways to implement this. Let's start with required imports:

from pyspark.sql.functions import col, expr, when

You can use Hive IF function inside expr:

new_column_1 = expr(
    """IF(fruit1 IS NULL OR fruit2 IS NULL, 3, IF(fruit1 = fruit2, 1, 0))"""
)

or when + otherwise:

new_column_2 = when(
    col("fruit1").isNull() | col("fruit2").isNull(), 3
).when(col("fruit1") == col("fruit2"), 1).otherwise(0)

Finally you could use following trick:

from pyspark.sql.functions import coalesce, lit

new_column_3 = coalesce((col("fruit1") == col("fruit2")).cast("int"), lit(3))

With example data:

df = sc.parallelize([
    ("orange", "apple"), ("kiwi", None), (None, "banana"), 
    ("mango", "mango"), (None, None)
]).toDF(["fruit1", "fruit2"])

you can use this as follows:

(df
    .withColumn("new_column_1", new_column_1)
    .withColumn("new_column_2", new_column_2)
    .withColumn("new_column_3", new_column_3))

and the result is:

+------+------+------------+------------+------------+
|fruit1|fruit2|new_column_1|new_column_2|new_column_3|
+------+------+------------+------------+------------+
|orange| apple|           0|           0|           0|
|  kiwi|  null|           3|           3|           3|
|  null|banana|           3|           3|           3|
| mango| mango|           1|           1|           1|
|  null|  null|           3|           3|           3|
+------+------+------------+------------+------------+


回答2:

You'll want to use a udf as below

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def func(fruit1, fruit2):
    if fruit1 == None or fruit2 == None:
        return 3
    if fruit1 == fruit2:
        return 1
    return 0

func_udf = udf(func, IntegerType())
df = df.withColumn('new_column',func_udf(df['fruit1'], df['fruit2']))


回答3:

the withColumn function in pyspark enables you to make a new variable with conditions, add in the when and otherwise functions and you have a properly working if then else structure.For all of this you would need to import the sparrsql functions, as you will see that the following bit of code will not work without the col() function. In the first bit, we declare a new column -'new column', and then give the condition enclosed in when function (i.e. fruit1==fruit2) then give 1 if the condition is true, if untrue the control goes to the otherwise which then takes care of the second condition (fruit1 or fruit2 is Null) with the isNull() function and if true 3 is returned and if false, the otherwise is checked again giving 0 as the answer

 from pyspark.sql import functions as F        
   df=df.withColumn('new_column', F.when(F.col('fruit1')==F.col('fruit2),1)
                          .otherwise(F.when((F.col('fruit1').isNull()) |(F.col('fruit2').isNull()),3))
                                    .otherwise(0))