Expand array-of-structs into columns in PySpark

2020-07-29 23:33发布

问题:

I have a Spark dataframe, originating from Google Analytics, that looks like the following:

id     customDimensions (Array<Struct>)
100    [ {"index": 1, "value": "Earth"}, {"index": 2, "value": "Europe"}]
101    [ {"index": 1, "value": "Mars" }]

I also have a "custom dimensions metadata" dataframe that looks like this:

index   name
1       planet
2       continent

I'd to use the indexes in the metadata df in order to expand my custom dimensions into columns. The result should look like the following:

id     planet     continent
100    Earth      Europe
101    Mars       null

I have tried the following approach, and it works fine, however it is extremely non-performant. I'd like to know if there's a better approach.

# Select the two relevant columns
cd = df.select('id', 'customDimensions')

# Explode customDimensions so that each row now has a {index, value}
cd = cd.withColumn('customDimensions', F.explode(cd.customDimensions))

# Put the index and value into their own columns
cd = cd.select('id', 'customDimensions.index', 'customDimensions.value')

# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = (cd
         .join(metadata, cd.index == metadata.index, 'left')
         .drop(metadata.index))

# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy('id').pivot('name').agg(F.first(F.col('value')))

# Join back to restore the other columns
return df.join(piv, df.id == piv.id).drop(piv.id)

Assumptions:

  • There are up to 250 custom dimension indexes, and the names are only known through the metadata dataframe
  • The original dataframe has several other columns that I would like to maintain (hence the join at the end of my solution)

回答1:

Joins are very costly operation because it results in data shuffling. If you can, you should avoid it or look to optimize it.

There are two joins in your code. The last join get the columns back can be avoided altogether. The other join with metadata dataframe can be optimized. Since metadata df has only 250 rows and is very, you can use broadcast() hint in the join. This would avoid shuffling of the larger dataframe.

I have made some suggested code changes but its not tested since I don't have your data.

# df columns list
df_columns = df.columns

# Explode customDimensions so that each row now has a {index, value}
cd = df.withColumn('customDimensions', F.explode(cd.customDimensions))

# Put the index and value into their own columns
cd = cd.select(*df_columns, 'customDimensions.index', 'customDimensions.value')

# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = cd.join(broadcast(metadata), "index", 'left')

# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy(df_columns).pivot('name').agg(F.first(F.col('value')))


return piv