Convert spark Dataframe with schema to dataframe o

2019-08-23 06:26发布

问题:

I have a Dataframe like this:

+--+--------+--------+----+-------------+------------------------------+
|id|name    |lastname|age |timestamp    |creditcards                   |
+--+--------+--------+----+-------------+------------------------------+
|1 |michel  |blanc   |35  |1496756626921|[[hr6,3569823], [ee3,1547869]]|
|2 |peter   |barns   |25  |1496756626551|[[ye8,4569872], [qe5,3485762]]|
+--+--------+--------+----+-------------+------------------------------+

where the schema of my df is like below:

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- age: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- creditcards: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- number: string (nullable = true)

I would like to convert each line to a json string knowing my schema. So this dataframe would have one column string containing the json. first line should be like this:

{
    "id":"1",
    "name":"michel",
    "lastname":"blanc",
    "age":"35",
    "timestamp":"1496756626921",
    "creditcards":[
        {
            "id":"hr6",
            "number":"3569823"
        },
        {
            "id":"ee3",
            "number":"1547869"  
        }
    ]
}

and the secone line of the dataframe should be like this:

{
    "id":"2",
    "name":"peter",
    "lastname":"barns",
    "age":"25",
    "timestamp":"1496756626551",
    "creditcards":[
        {
            "id":"ye8",
            "number":"4569872"
        },
        {
            "id":"qe5",
            "number":"3485762"  
        }
    ]
}

my goal is not to write the dataframe to json file. My goal is to convert df1 to a second df2 in order to push each json line of df2 to kafka topic I have this code to create the dataframe:

    val line1 = """{"id":"1","name":"michel","lastname":"blanc","age":"35","timestamp":"1496756626921","creditcards":[{"id":"hr6","number":"3569823"},{"id":"ee3","number":"1547869"}]}"""
    val line2 = """{"id":"2","name":"peter","lastname":"barns","age":"25","timestamp":"1496756626551","creditcards":[{"id":"ye8","number":"4569872"}, {"id":"qe5","number":"3485762"}]}"""

    val rdd = sc.parallelize(Seq(line1, line2))
    val df = sqlContext.read.json(rdd)
    df show false
    df printSchema

Do you have any idea?

回答1:

If all you need is a single-column DataFrame/Dataset with each column value representing each row of the original DataFrame in JSON, you can simply apply toJSON to your DataFrame, as in the following:

df.show
// +---+------------------------------+---+--------+------+-------------+
// |age|creditcards                   |id |lastname|name  |timestamp    |
// +---+------------------------------+---+--------+------+-------------+
// |35 |[[hr6,3569823], [ee3,1547869]]|1  |blanc   |michel|1496756626921|
// |25 |[[ye8,4569872], [qe5,3485762]]|2  |barns   |peter |1496756626551|
// +---+------------------------------+---+--------+------+-------------+

val dsJson = df.toJSON
// dsJson: org.apache.spark.sql.Dataset[String] = [value: string]

dsJson.show
// +--------------------------------------------------------------------------+
// |value                                                                     |
// +--------------------------------------------------------------------------+
// |{"age":"35","creditcards":[{"id":"hr6","number":"3569823"},{"id":"ee3",...|
// |{"age":"25","creditcards":[{"id":"ye8","number":"4569872"},{"id":"qe5",...|
// +--------------------------------------------------------------------------+

[UPDATE]

To add name as an additional column, you can extract it from the JSON column using from_json:

val result = dsJson.withColumn("name", from_json($"value", df.schema)("name"))

result.show
// +--------------------+------+
// |               value|  name|
// +--------------------+------+
// |{"age":"35","cred...|michel|
// |{"age":"25","cred...| peter|
// +--------------------+------+


回答2:

For that, you can directly convert your dataframe to a Dataset of JSON string using

val jsonDataset: Dataset[String] = df.toJSON

You can convert it into a dataframe using

val jsonDF: DataFrame = jsonDataset.toDF

Here the json will be alphabetically ordered so the output of

jsonDF show false

will be

    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |value                                                                                                                                                               |
    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |{"age":"35","creditcards":[{"id":"hr6","number":"3569823"},{"id":"ee3","number":"1547869"}],"id":"1","lastname":"blanc","name":"michel","timestamp":"1496756626921"}|
    |{"age":"25","creditcards":[{"id":"ye8","number":"4569872"},{"id":"qe5","number":"3485762"}],"id":"2","lastname":"barns","name":"peter","timestamp":"1496756626551"} |
    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------+