Problems with adding a new column to a dataframe -

2020-03-31 08:33发布

问题:

I am new to spark/scala. I am trying to read some data from a hive table to a spark dataframe and then add a column based on some condition. Here is my code:

val DF = hiveContext.sql("select * from (select * from test_table where partition_date='2017-11-22') a JOIN (select max(id) as bid from test_table where partition_date='2017-11-22' group by at_id) b ON a.id=b.bid")

def dateDiff(partition_date: org.apache.spark.sql.Column, item_due_date: org.apache.spark.sql.Column): Long ={
      ChronoUnit.DAYS.between(LocalDate.parse(partition_date.toString()), LocalDate.parse(item_due_date.toString))
    }

val finalDF = DF.withColumn("status", 
                   when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && !(col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "approved")
                  .when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && (col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "pending")
                  .when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) >= 0), "expired")
                  .otherwise("null"))

dateDiff is a function that calculates the difference between partition_date and item_due_date, which are columns in DF. I am trying to add a new column to DF by using when and otherwise which uses the dateDiff to get the difference between dates.

Now, when I run the above code, I get the following error: org.threeten.bp.format.DateTimeParseException: Text 'partition_date' could not be parsed at index 0

I believe the value of the column partition_date is not being converted to a String to be parsed as a date. Is this what's happening? If yes, how do I cast the column value to a String ?

Below is the schema of the columns I am using from the DF :

 |-- item_due_date: string (nullable = true)
 |-- past_due: integer (nullable = true)
 |-- item_decision: string (nullable = true)
 |-- partition_date: string (nullable = true)

A data sample of the columns I am using from the DF :

+--------+-------------+-------------+--------------+
|past_due|item_due_date|item_decision|partition_date|
+--------+-------------+-------------+--------------+
|       1|   0001-01-14|         null|    2017-11-22|
|       1|   0001-01-14|     Mitigate|    2017-11-22|
|       1|   0001-01-14|     Mitigate|    2017-11-22|
|       1|   0001-01-14|     Mitigate|    2017-11-22|
|       0|   2018-03-18|         null|    2017-11-22|
|       1|   2016-11-30|         null|    2017-11-22|
+--------+-------------+-------------+--------------+

I also tried using a custom UDF:

  def status(past_due: Int, item_decision: String, maxPartitionDate: String, item_due_date: String): String = {
      if (past_due == 1 && item_due_date != "NULL") {
        if (ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(item_due_date.trim)) < 0) {
          if (item_decision != "NULL") "pending"
          else "approved"
        } else "expired"
      } else "NULL"
    }

val statusUDF = sqlContext.udf.register("statusUDF", status _)

val DF2 = DF.withColumn("status", statusUDF(DF("past_due"),DF("item_decision"),DF("partition_date"),DF("item_due_date")))
DF2.show()

And it throws the following error at the DF2.show statement, everytime:

Container exited with a non-zero exit code 50

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1644)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1603)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1592)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
        at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
        at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
        at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
        at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
        at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
        at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
        at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
        at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
        at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
        at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
        at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
        at driver$.main(driver.scala:109)
        at driver.main(driver.scala)

Any help would be appreciated. Thank you!

回答1:

You can simply use datediff inbuilt function to check for the days difference between two columns. you don't need to write your function or udf function. And when function is also modified than yours

import org.apache.spark.sql.functions._
val finalDF = DF.withColumn("status",
  when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) < 0) && col("item_decision").isNotNull && !(lower(col("item_decision")).equalTo("null")), "approved")
    .otherwise(when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) < 0) && (col("item_decision").isNull || lower(col("item_decision")).equalTo("null")), "pending")
      .otherwise(when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) >= 0), "expired")
    .otherwise("null"))))

This logic will convert the dataframe

+--------+-------------+-------------+--------------+
|past_due|item_due_date|item_decision|partition_date|
+--------+-------------+-------------+--------------+
|1       |2017-12-14   |null         |2017-11-22    |
|1       |2017-12-14   |Mitigate     |2017-11-22    |
|1       |0001-01-14   |Mitigate     |2017-11-22    |
|1       |0001-01-14   |Mitigate     |2017-11-22    |
|0       |2018-03-18   |null         |2017-11-22    |
|1       |2016-11-30   |null         |2017-11-22    |
+--------+-------------+-------------+--------------+

with addition of status column as

+--------+-------------+-------------+--------------+--------+
|past_due|item_due_date|item_decision|partition_date|status  |
+--------+-------------+-------------+--------------+--------+
|1       |2017-12-14   |null         |2017-11-22    |pending |
|1       |2017-12-14   |Mitigate     |2017-11-22    |approved|
|1       |0001-01-14   |Mitigate     |2017-11-22    |expired |
|1       |0001-01-14   |Mitigate     |2017-11-22    |expired |
|0       |2018-03-18   |null         |2017-11-22    |null    |
|1       |2016-11-30   |null         |2017-11-22    |expired |
+--------+-------------+-------------+--------------+--------+

I hope the answer is helpful