从CSV火花2时间型引起与取异常(5)(Spark 2 time type from CSV cau

2019-09-30 10:48发布

我很新的火星,Scala和我试图加载类似于一个CSV到:

A,09:33:57.570
B,09:43:02.577
...

唯一的时间I型scala.sql.types看到的是TimestampType,所以我加载的CSV:

val schema = StructType(Array( StructField("A", StringType, true), StructField("time", TimestampType, true)))

val table = spark.read.option("header","false").option("inferSchema","false").schema(schema).csv("../table.csv")

这似乎做工精细,直到我做table.show()table.take(5)等,在这种情况下,我得到下面的异常:

scala> table.show()
16/10/07 16:32:25 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalArgumentException
        at java.sql.Date.valueOf(Date.java:143)
        at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137)
        at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:287)
        at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:115)
        at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:84)
        at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$1.apply(CSVFileFormat.scala:125)
        at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$1.apply(CSVFileFormat.scala:124)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

是否有具有存储火花时间内数据的首选方法是什么? 我也曾尝试把它当作从每个值java.time字符串和映射LocalTime.parse(),但未能说是有该类型没有编码器。

Answer 1:

有没有可以直接适应的时间数据,因此可能是你能做的最好是使用SQL类型LongType通过解析unix_timestamp 。 读取数据

 StructField("time", StringType, true)))

它应产生具有类似的数据帧:

val df = Seq(("A", "09:33:57.570"), ("B", "09:43:02.577")).toDF("A", "time")

定义一个简单的日期格式:

val format = "HH:mm:ss.SSS"

利用它进行解析:

df.withColumn("seconds", unix_timestamp($"time", format))

不幸的是,这是一种有损转型。

+---+------------+-------+
|  A|        time|seconds|
+---+------------+-------+
|  A|09:33:57.570|  30837|
|  B|09:43:02.577|  31382|
+---+------------+-------+

所以如果你想保留毫秒可以使用java.time.LocalTime为你做的和存储的结果toNanoOfDay

val nanoOfDay = udf((s: String) => 
  java.time.LocalTime.parse(s).toNanoOfDay)

df.withColumn("nanseconds", nanoOfDay($"time"))


Answer 2:

你可能也想看看JodaTime日期/时间操作。 您可以包括这在你的pom.xml(Maven的)

    <dependency>
        <groupId>joda-time</groupId>
        <artifactId>joda-time</artifactId>
        <version>2.9</version>
    </dependency>

    <dependency>
        <groupId>org.joda</groupId>
        <artifactId>joda-convert</artifactId>
        <version>1.8.1</version>
    </dependency>


文章来源: Spark 2 time type from CSV causing exception with take(5)