How to replace in values in spark dataframes after

2019-08-19 08:00发布

问题:

I have a schema in spark as

root
|-- atom: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- dailydata: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- datatimezone: string (nullable = true)
|    |    |    |    |-- intervaltime: long (nullable = true)
|    |    |    |    |-- intervalvalue: long (nullable = true)
|    |    |    |    |-- utcacquisitiontime: string (nullable = true)
|    |    |-- usage: string (nullable = true)
| -- titlename: string (nullable = true)

I have extracted the utcacquisitiontime and datatimezone as below from above schema

val result=q.selectExpr("explode(dailydata) as r").select("r.utcacquisitiontime","r.datatimezone")

+--------------------+------------+
|  utcacquisitiontime|datatimezone|
+--------------------+------------+
|2017-03-27T22:00:00Z|      +02:00|
|2017-03-27T22:15:00Z|      +02:00|
|2017-03-27T22:30:00Z|      +02:00|
|2017-03-27T22:45:00Z|      +02:00|
|2017-03-27T23:00:00Z|      +02:00|
|2017-03-27T23:15:00Z|      +02:00|
|2017-03-27T23:30:00Z|      +02:00|
|2017-03-27T23:45:00Z|      +02:00|
|2017-03-28T00:00:00Z|      +02:00|
|2017-03-28T00:15:00Z|      +02:00|
|2017-03-28T00:30:00Z|      +02:00|
|2017-03-28T00:45:00Z|      +02:00|
|2017-03-28T01:00:00Z|      +02:00|
|2017-03-28T01:15:00Z|      +02:00|
|2017-03-28T01:30:00Z|      +02:00|
|2017-03-28T01:45:00Z|      +02:00|
|2017-03-28T02:00:00Z|      +02:00|
|2017-03-28T02:15:00Z|      +02:00|
|2017-03-28T02:30:00Z|      +02:00|
|2017-03-28T02:45:00Z|      +02:00|
+--------------------+------------+

I need to calculate localtime using these two columns and replace them by the localtime after calculations. How shall I calculate localtime and replace the same?

回答1:

You can rely on udf function in spark (User Defined Function). Also in the org.apache.sql.functions._ there are plenty of already predefined function that might help you. But here is how you can make this work

+-------------------+------------+
| utcacquisitiontime|datatimezone|
+-------------------+------------+
|2017-03-27T22:00:00|      +02:00|
+-------------------+------------+

Note that I have removed the unnecessary "Z" from time column. Using JodaTime dependency define a udf function like this:

val toTimestamp = udf((time:String, zone:String) => {
      val timezone = DateTimeZone.forID(zone)
     val df = DateTimeFormat.forPattern("yyyy-mm-dd'T'HH:mm:ss")
     new java.sql.Timestamp(df.withZone(timezone).parseDateTime(time).getMillis)
     }) 

Apply it on a column with withColumn

df.withColumn("timestamp", toTimestamp(col("utcacquisitiontime"), col("datatimezone"))

Show the results (note that in the schema the column timestamp is of type Timestamp so you can do date operation on it)

+-------------------+------------+--------------------+
| utcacquisitiontime|datatimezone|           timestamp|
+-------------------+------------+--------------------+
|2017-03-27T22:00:00|      +02:00|2017-01-27 22:00:...|
+-------------------+------------+--------------------+

root
 |-- utcacquisitiontime: string (nullable = true)
 |-- datatimezone: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)


回答2:

You may use Joda Time API to convert time in your df column to the localtime by doing something like,

def convertToLocal(str:String):String = new DateTime(str).toLocalDateTime().toString 

next you import sql implicits by,

import ss.implicits._

where ss is the instance of your SparkSession. To convert each element of utcacquisitiontime column to localDateTime, do something like this,

val df=result map(r=>(convertToLocal(r.getString(0)),r.getString(1)))

df show

Let me know if this helps. Cheers.