PySpark converting a column of type 'map'

2019-01-09 13:37发布

问题:

Input

I have a column Parameters of type map of the form:

>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> d = [{'Parameters': {'foo': '1', 'bar': '2', 'baz': 'aaa'}}]
>>> df = sqlContext.createDataFrame(d)
>>> df.collect()
[Row(Parameters={'foo': '1', 'bar': '2', 'baz': 'aaa'})]

Output

I want to reshape it in pyspark so that all the keys (foo, bar, etc.) are columns, namely:

[Row(foo='1', bar='2', baz='aaa')]

Using withColumn works:

(df
 .withColumn('foo', df.Parameters['foo'])
 .withColumn('bar', df.Parameters['bar'])
 .withColumn('baz', df.Parameters['baz'])
 .drop('Parameters')
).collect()

But I need like a solution that doesn't explicitly mention the column names as I have dozens of them.

Schema

>>> df.printSchema()

root
 |-- Parameters: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

回答1:

Since keys of the MapType are not a part of the schema you'll have to collect these first for example like this:

from pyspark.sql.functions import explode

keys = (df
    .select(explode("Parameters"))
    .select("key")
    .distinct()
    .rdd.flatMap(lambda x: x)
    .collect())

When you have this all what is left is simple select:

from pyspark.sql.functions import col

exprs = [col("Parameters").getItem(k).alias(k) for k in keys]
df.select(*exprs)