How to rename fields in an DataFrame corresponding

2019-03-14 04:57发布

问题:

I am trying to process JSON events received in a mobile app (like clicks etc.) using spark 1.5.2. There are multiple app versions and the structure of the events varies across versions.

Say version 1 has the following structure:

{
    "timestamp": "",
    "ev": {
                "app": {
                    "appName": "XYZ",
                    "appVersion": "1.2.0"
                }
                "device": {
                    "deviceId": "ABC",
                    ...
                }
                ...
             }
}

And another version has the following structure:

{
    "timestamp": "",
    "ev": {
                "_a": {
                    "name": "XYZ",
                    "version": "1.3.0"
                }
                "_d": {
                    "androidId": "ABC",
                    ...
                }
                ...
             }
}

I want to be able to create a single dataframe for both the structure and perform some queries.

I am creating two different dataframes for each structure using the filter function. Now I need to be able to able rename the columns to perform union operation on the two dataframes.

I am using:

df.withColumnRenamed("ev.app", "ev._a").withColumnRenamed("ev.device", "ev._d");

But this does not work. How do I achieve this?

回答1:

If it's just about renaming nested columns and not about changing schema structure, then replacing a DataFrame schema (re-creating DataFrame with new schema) would work just fine.

object functions {

  private def processField(structField: StructField, fullColName: String, oldColName: String, newColName: String): StructField = {
    if (fullColName.equals(oldColName)) {
      new StructField(newColName, structField.dataType, structField.nullable)
    } else if (oldColName.startsWith(fullColName)) {
      new StructField(structField.name, processType(structField.dataType, fullColName, oldColName, newColName), structField.nullable)
    } else {
      structField
    }
  }

  private def processType(dataType: DataType, fullColName: String, oldColName: String, newColName: String): DataType = {
    dataType match {
      case structType: StructType =>
        new StructType(structType.fields.map(
          f => processField(f, if (fullColName == null) f.name else s"${fullColName}.${f.name}", oldColName, newColName)))
      case other => other
    }
  }

  implicit class ExtDataFrame(df: DataFrame) {
    def renameNestedColumn(oldColName: String, newColName: String): DataFrame = {
      df.sqlContext.createDataFrame(df.rdd, processType(df.schema, null, oldColName, newColName).asInstanceOf[StructType])
    }
  }
}

Usage:

scala> import functions._
import functions._

scala> df.printSchema
root
 |-- geo_info: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country_code: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- region: string (nullable = true)

scala> df.renameNestedColumn("geo_info.country_code", "country").printSchema
root
 |-- geo_info: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- region: string (nullable = true)

This implementation is recursive, so it should handle cases like this as well:

df.renameNestedColumn("a.b.c.d.e.f", "bla")


回答2:

Given two messages M1 and M2 like

case class Ev1(app1: String)
case class M1(ts: String, ev1: Ev1)

case class Ev2(app2: String)
case class M2(ts: String, ev2: Ev2)

and two data frames df1 (which contains M1), and df2 (containing M2), both data frames registered as temp tables, then you can use QL:

val merged = sqlContext.sql(
  """
    |select
    |    df1.ts as ts,
    |    named_struct('app', df1.ev1.app1) as ev
    |  from
    |    df1
    |
    |union all
    |
    |select
    |    df2.ts as ts,
    |    named_struct('app', df2.ev2.app2) as ev
    |  from
    |    df2
  """.stripMargin)
  • Use as to give the same names
  • Use named_struct to build compatible nested structs on-the fly
  • Use union all to put it all together

Not shown in the example, but functions like collect_list might be useful as well.