How to load CSVs with timestamps in custom format?

2020-02-11 03:03发布

问题:

I have a timestamp field in a csv file that I load to a dataframe using spark csv library. The same piece of code works on my local machine with Spark 2.0 version but throws an error on Azure Hortonworks HDP 3.5 and 3.6.

I have checked and Azure HDInsight 3.5 is also using the same Spark version so I don't think it's a problem with Spark version.

import org.apache.spark.sql.types._
val sourceFile = "C:\\2017\\datetest"
val sourceSchemaStruct = new StructType()
  .add("EventDate",DataTypes.TimestampType)
  .add("Name",DataTypes.StringType)
val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .option("delimiter","|")
  .option("mode","FAILFAST")
  .option("inferSchema","false")
  .option("dateFormat","yyyy/MM/dd HH:mm:ss.SSS")
  .schema(sourceSchemaStruct)
  .load(sourceFile)

The whole exception is as follows:

Caused by: java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
  at java.sql.Timestamp.valueOf(Timestamp.java:237)
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:179)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply$mcJ$sp(UnivocityParser.scala:142)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
  at scala.util.Try.getOrElse(Try.scala:79)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:139)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:135)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$nullSafeDatum(UnivocityParser.scala:179)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:135)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:134)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:215)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:187)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
  at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61)
  ... 27 more

The csv file has only one row as follows:

"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"

回答1:

TL;DR Use timestampFormat option (not dateFormat).


I've managed to reproduce it in the latest Spark version 2.3.0-SNAPSHOT (built from the master).

// OS shell
$ cat so-43259485.csv
"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"

// spark-shell
scala> spark.version
res1: String = 2.3.0-SNAPSHOT

case class Event(EventDate: java.sql.Timestamp, Name: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Event].schema

scala> spark
  .read
  .format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .schema(schema)
  .load("so-43259485.csv")
  .show(false)
17/04/08 11:03:42 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
    at java.sql.Timestamp.valueOf(Timestamp.java:237)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:167)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply$mcJ$sp(UnivocityParser.scala:146)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
    at scala.util.Try.getOrElse(Try.scala:79)

The corresponding line in the Spark sources is the "root cause" of the issue:

Timestamp.valueOf(s)

Having read the javadoc of Timestamp.valueOf, you can learn that the argument should be:

timestamp in format yyyy-[m]m-[d]d hh:mm:ss[.f...]. The fractional seconds may be omitted. The leading zero for mm and dd may also be omitted.

Note "The fractional seconds may be omitted" so let's cut it off by first loading the EventDate as a String and only after removing the unneeded fractional seconds convert it to Timestamp.

val eventsAsString = spark.read.format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .load("so-43259485.csv")

It turns out that for fields of TimestampType type Spark uses timestampFormat option first if defined and only if not uses the code the uses Timestamp.valueOf.

It turns out the fix is just to use timestampFormat option (not dateFormat!).

val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .option("delimiter","|")
  .option("mode","FAILFAST")
  .option("inferSchema","false")
  .option("timestampFormat","yyyy/MM/dd HH:mm:ss.SSS")
  .schema(sourceSchemaStruct)
  .load(sourceFile)
scala> df.show(false)
+-----------------------+----+
|EventDate              |Name|
+-----------------------+----+
|2016-12-19 00:43:27.583|adam|
+-----------------------+----+

Spark 2.1.0

Use schema inference in CSV using inferSchema option with your custom timestampFormat.

It's important to trigger schema inference using inferSchema for timestampFormat to take effect.

val events = spark.read
  .format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .option("inferSchema", true)
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
  .load("so-43259485.csv")

scala> events.show(false)
+-------------------+----+
|EventDate          |Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+

scala> events.printSchema
root
 |-- EventDate: timestamp (nullable = true)
 |-- Name: string (nullable = true)

"Incorrect" initial version left for learning purposes

val events = eventsAsString
  .withColumn("date", split($"EventDate", " ")(0))
  .withColumn("date", translate($"date", "/", "-"))
  .withColumn("time", split($"EventDate", " ")(1))
  .withColumn("time", split($"time", "[.]")(0))    // <-- remove millis part
  .withColumn("EventDate", concat($"date", lit(" "), $"time")) // <-- make EventDate right
  .select($"EventDate" cast "timestamp", $"Name")

scala> events.printSchema
root
 |-- EventDate: timestamp (nullable = true)
 |-- Name: string (nullable = true)
    events.show(false)

scala> events.show
+-------------------+----+
|          EventDate|Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+

Spark 2.2.0

As of Spark 2.2 you can use to_timestamp function to do the string to timestamp conversion.

eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)

scala> eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
+-----------------------+----------------------------------------------------+
|EventDate              |to_timestamp(`EventDate`, 'yyyy/MM/dd HH:mm:ss.SSS')|
+-----------------------+----------------------------------------------------+
|2016/12/19 00:43:27.583|2016-12-19 00:43:27                                 |
+-----------------------+----------------------------------------------------+


回答2:

I searched for this issue, and discovered the offical Github issue page https://github.com/databricks/spark-csv/pull/280 which has fixed a related bug for parsing data with custom date format. I reviewed some source codes, and according to the code to find out your issue reason which is set inferSchema with the default value false as below.

inferSchema: automatically infers column types. It requires one extra pass over the data and is false by default

Please change inferSchema with true for your date format yyyy/MM/dd HH:mm:ss.SSS using SimpleDateFormat.