Spark Strutured Streaming automatically converts t

2019-01-06 23:52发布

问题:

I have my timestamp in UTC and ISO8601, but using Structured Streaming, it gets automatically converted into the local time. Is there a way to stop this conversion? I would like to have it in UTC.

I'm reading json data from Kafka and then parsing them using the from_json Spark function.

Input:

{"Timestamp":"2015-01-01T00:00:06.222Z"}

Flow:

SparkSession
  .builder()
  .master("local[*]")
  .appName("my-app")
  .getOrCreate()
  .readStream()
  .format("kafka")
  ... //some magic
  .writeStream()
  .format("console")
  .start()
  .awaitTermination();

Schema:

StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField("Timestamp", DataTypes.TimestampType, true),});

Output:

+--------------------+
|           Timestamp|
+--------------------+
|2015-01-01 01:00:...|
|2015-01-01 01:00:...|
+--------------------+

As you can see, the hour has incremented by itself.

PS: I tried to experiment with the from_utc_timestamp Spark function, but no luck.

回答1:

For me it worked to use:

spark.conf.set("spark.sql.session.timeZone", "UTC")

It tells the spark SQL to use UTC as a default timezone for timestamps. I used it in spark SQL for example:

select *, cast('2017-01-01 10:10:10' as timestamp) from someTable

I know it does not work in 2.0.1. but works in Spark 2.2. I used in SQLTransformer also and it worked.

I am not sure about streaming though.



回答2:

Note:

This answer is useful primarily in Spark < 2.2. For newer Spark version see the answer by astro-asz

TL;DR Unfortunately this is how Spark handles timestamps right now and there is really no built-in alternative, other than operating on epoch time directly, without using date / time utilities.

You can an insightful discussion on the Spark developers list: SQL TIMESTAMP semantics vs. SPARK-18350

The cleanest workaround I've found so far is to set -Duser.timezone to UTC for both the driver and executors. For example with submit:

bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" \
                --conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC"

or by adjusting configuration files (spark-defaults.conf):

spark.driver.extraJavaOptions      -Duser.timezone=UTC
spark.executor.extraJavaOptions    -Duser.timezone=UTC