Nested dynamic schema not working while parsing JS

2019-08-24 03:41发布

问题:

I am trying to extract certain parameters from a nested JSON (having dynamic schema) and generate a spark dataframe using pyspark.

My code works perfectly for level 1 (key:value) but fails get independent columns for each (key:value) pair that are a part of nested JSON.

JSON schema sample

Note - This is not the exact schema. Its just to give the idea of nested nature of the schema

{
  "tweet": {
    "text": "RT @author original message"
    "user": {
          "screen_name": "Retweeter"
    },
    "retweeted_status": {
      "text": "original message".
        "user": {         
            "screen_name": "OriginalTweeter"
        },
        "place": {          
        },
        "entities": {          
        },
        "extended_entities": {          
        }
      },     
    },
    "entities": {      
    },
    "extended_entities": {      
    }
  }
}

PySpark Code

from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
StructField("text", StringType(), True),
StructField("created_at", StringType(), True),
StructField("retweeted_status", StructType([
StructField("text", StringType(), True),
StructField("created_at", StringType(), True)]))
])

df = spark.read.schema(schema).json("/user/sagarp/NaMo/data/NaMo2019-02-12_00H.json")
df.show()

Current output - (with real JSON data)

All (keys:values) under nested retweet_status JSON are squashed into 1 single list. eg [text, created_at, entities]

+--------------------+--------------------+--------------------+
|                text|          created_at|    retweeted_status|
+--------------------+--------------------+--------------------+
|RT @Hoosier602: @...|Mon Feb 11 19:04:...|[@CLeroyjnr @Gabr...|
|RT @EgSophie: Oh ...|Mon Feb 11 19:04:...|[Oh cool so do yo...|
|RT @JacobAWohl: @...|Mon Feb 11 19:04:...|[@realDonaldTrump...|

Expected output

I want independent columns for each key. Also, note that you already have a parent level key by the same name text. How will you deal with such instances?

Ideally, I would want columns like "text", "entities", "retweet_status_text", "retweet_status_entities", etc

回答1:

Your schema is not mapped properly ... please see these posts if you want to manually construct schema (which is recommended if the data doesn't change):

PySpark: How to Update Nested Columns?

https://docs.databricks.com/_static/notebooks/complex-nested-structured.html

Also, if your JSON is multi-line (like your example) then you can ...

  1. read json via multi-line option to get Spark to infer schema
  2. then save nested schema
  3. then read data back in with the correct schema mapping to avoid triggering a Spark job
! cat nested.json

[
    {"string":"string1","int":1,"array":[1,2,3],"dict": {"key": "value1"}},
    {"string":"string2","int":2,"array":[2,4,6],"dict": {"key": "value2"}},
    {
        "string": "string3",
        "int": 3,
        "array": [
            3,
            6,
            9
        ],
        "dict": {
            "key": "value3",
            "extra_key": "extra_value3"
        }
    }
]

getSchema = spark.read.option("multiline", "true").json("nested.json")

extractSchema = getSchema.schema
print(extractSchema)
StructType(List(StructField(array,ArrayType(LongType,true),true),StructField(dict,StructType(List(StructField(extra_key,StringType,true),StructField(key,StringType,true))),true),StructField(int,LongType,true),StructField(string,StringType,true)))

loadJson = spark.read.option("multiline", "true").schema(extractSchema ).json("nested.json")

loadJson.printSchema()
root
 |-- array: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- dict: struct (nullable = true)
 |    |-- extra_key: string (nullable = true)
 |    |-- key: string (nullable = true)
 |-- int: long (nullable = true)
 |-- string: string (nullable = true)

loadJson.show(truncate=False)
+---------+----------------------+---+-------+
|array    |dict                  |int|string |
+---------+----------------------+---+-------+
|[1, 2, 3]|[, value1]            |1  |string1|
|[2, 4, 6]|[, value2]            |2  |string2|
|[3, 6, 9]|[extra_value3, value3]|3  |string3|
+---------+----------------------+---+-------+

Once you have the data loaded with the correct mapping then you can start to transform into a normalized schema via the "dot" notation for nested columns and "explode" to flatten arrays, etc.

loadJson\
.selectExpr("dict.key as key", "dict.extra_key as extra_key").show()

+------+------------+
|   key|   extra_key|
+------+------------+
|value1|        null|
|value2|        null|
|value3|extra_value3|
+------+------------+