Adding a new column to a Dataframe by using the va

2019-02-15 09:47发布

问题:

I am new to spark SQL and Dataframes. I have a Dataframe to which I should be adding a new column based on the values of other columns. I have a Nested IF formula from excel that I should be implementing (for adding values to the new column), which when converted into programmatic terms, is something like this:

if(k =='yes')
{
  if(!(i==''))
  {
    if(diff(max_date, target_date) < 0)
    {
      if(j == '')
      {
        "pending" //the value of the column
      }
      else {
        "approved" //the value of the column
      }
    }
    else{
      "expired" //the value of the column
    }
  }
  else{
    "" //the value should be empty
  }
}
else{
  "" //the value should be empty
} 

i,j,k are three other columns in the Dataframe. I know we can use withColumn and when to add new columns based on other columns, but I am not sure how I can achieve the above logic using that approach.

what would be an easy/efficient way to implement the above logic for adding the new column? Any help would be appreciated.

Thank you.

回答1:

First thing, lets simplify that if statement:

if(k == "yes" && i.nonEmpty)
  if(maxDate - targetDate < 0)
    if (j.isEmpty) "pending" 
    else "approved"
  else "expired"
else ""

Now there are 2 main ways to accomplish this

  1. Using a custom UDF
  2. Using spark built in functions: coalesce, when, otherwise

Custom UDF

Now due to the complexity of your conditions, it will be rather tricky to do number 2. Using a custom UDF should suit your needs.

def getState(i: String, j: String, k: String, maxDate: Long, targetDate: Long): String =  
  if(k == "yes" && i.nonEmpty)
    if(maxDate - targetDate < 0)
      if (j.isEmpty) "pending" 
      else "approved"
    else "expired"
  else ""

val stateUdf = udf(getState _)
df.withColumn("state", stateUdf($"i",$"j",$"k",lit(0),lit(0)))

Just change lit(0) and lit(0) to your date code, and this should work for you.

Using spark built in functions

If you notice performance issues, you can switch to using coalesce, otherwise, and when, which would look something like this:

val isApproved = df.withColumn("state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) < 0) && $"j" =!= "", "approved").otherwise(null))
val isPending = isApproved.withColumn("state", coalesce($"state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) < 0) && $"j" === "", "pending").otherwise(null)))
val isExpired = isPending.withColumn("state", coalesce($"state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) >= 0), "expired").otherwise(null)))
val finalDf = isExpired.withColumn("state", coalesce($"state", lit("")))

I've used custom udf's in the past with large input sources without issues, and custom udfs can lead to much more readable code, especially in this case.