Spark dataframes convert nested JSON to seperate c

2019-03-05 22:43发布

问题:

I've a stream of JSONs with following structure that gets converted to dataframe

{
  "a": 3936,
  "b": 123,
  "c": "34",
  "attributes": {
    "d": "146",
    "e": "12",
    "f": "23"
  }
}

The dataframe show functions results in following output

sqlContext.read.json(jsonRDD).show

+----+-----------+---+---+
|   a| attributes|  b|  c|
+----+-----------+---+---+
|3936|[146,12,23]|123| 34|
+----+-----------+---+---+

How can I split attributes column (nested JSON structure) into attributes.d, attributes.e and attributes.f as seperate columns into a new dataframe, so I can have columns as a, b, c, attributes.d, attributes.e and attributes.f in the new dataframe?

回答1:

  • If you want columns named from a to f:

    df.select("a", "b", "c", "attributes.d", "attributes.e", "attributes.f")
    
  • If you want columns named with attributes. prefix:

    df.select($"a", $"b", $"c", $"attributes.d" as "attributes.d", $"attributes.e" as "attributes.e", $"attributes.f" as "attributes.f")
    
  • If names of your columns are supplied from an external source (e.g. configuration):

    val colNames: Seq("a", "b", "c", "attributes.d", "attributes.e", "attributes.f")
    
    df.select(colNames.head, colNames.tail: _*).toDF(colNames:_*)
    


回答2:

Using the attributes.d notation, you can create new columns and you will have them in your DataFrame. Look at the withColumn() method in Java.



回答3:

Use Python

  1. Extract the DataFrame by using the pandas Lib of python.
  2. Change the data type from 'str' to 'dict'.
  3. Get the values of each features.
  4. Save the results to a new file.

    import pandas as pd
    
    data = pd.read_csv("data.csv")  # load the csv file from your disk
    json_data = data['Desc']        # get the DataFrame of Desc
    data = data.drop('Desc', 1)     # delete Desc column
    Total, Defective = [], []       # setout list
    
    for i in json_data:
        i = eval(i)     # change the data type from 'str' to 'dict'
        Total.append(i['Total'])    # append 'Total' feature
        Defective.append(i['Defective'])    # append 'Defective' feature
    
    # finally,complete the DataFrame
    data['Total'] = Total
    data['Defective'] = Defective
    
    data.to_csv("result.csv")       # save to the result.csv and check it