Pandas dataframe to Spark dataframe, handling NaN

2019-02-19 11:49发布

问题:

I want to convert dataframe from pandas to spark and I am using spark_context.createDataFrame() method to create the dataframe. I'm also specifying the schema in the createDataFrame() method.

What I want to know is how handle special cases. For example, NaN in pandas when converted to Spark dataframe ends up being string "NaN". I am looking for ways how to get actual nulls instead of "NaN".

回答1:

TL;DR Your best option for now is to skip Pandas completely.

The source of the problem is that Pandas are less expressive than Spark SQL. Spark provides both NULL (in a SQL sense, as missing value) and NaN (numeric Not a Number).

Pandas from the other handm doesn't have native value which can be used to represent missing values. As a result it uses placeholders like NaN / NaT or Inf, which are indistinguishable to Spark from actual NaNs and Infs and conversion rules depend on the column type. The only exception are object columns (typically strings) which can contain None values. You can learn more about handling missing values Pandas from the documentation.

For example, NaN in pandas when converted to Spark dataframe ends up being string "NaN".

This is actually not correct. Depending on type of input column. If column shows NaN it is most likely not a number value, not a plain string:

from pyspark.sql.functions import isnan, isnull

pdf = pd.DataFrame({
    "x": [1, None], "y": [None, "foo"], 
    "z": [pd.Timestamp("20120101"), pd.Timestamp("NaT")]
})
sdf = spark.createDataFrame(pdf)

sdf.show()
+---+----+-------------------+
|  x|   y|                  z|
+---+----+-------------------+
|1.0|null|2012-01-01 00:00:00|
|NaN| foo|               null|
+---+----+-------------------+
sdf.select([
    f(c) for c in sdf.columns for f in [isnan, isnull] 
    if (f, c) != (isnan, "z")  # isnan cannot be applied to timestamp 
]).show()
+--------+-----------+--------+-----------+-----------+
|isnan(x)|(x IS NULL)|isnan(y)|(y IS NULL)|(z IS NULL)|
+--------+-----------+--------+-----------+-----------+
|   false|      false|   false|       true|      false|
|    true|      false|   false|      false|       true|
+--------+-----------+--------+-----------+-----------+

In practice, parallelized local collections (including Pandas objects) have negligible importance beyond simple testing and toy examples so you can always convert data manually (skipping possible Arrow optimizations):

import numpy as np

spark.createDataFrame([
   tuple(
        None if isinstance(x, (float, int)) and np.isnan(x) else x
        for x in record.tolist())
   for record in pdf.to_records(index=False)
], pdf.columns.tolist()).show()
+----+----+-------------------+
|   x|   y|                  z|
+----+----+-------------------+
| 1.0|null|1325376000000000000|
|null| foo|               null|
+----+----+-------------------+

If missing / not-a-number ambiguity is not an issue then just load data as usually and replace in Spark.

from pyspark.sql.functions import col, when 

sdf.select([
    when(~isnan(c), col(c)).alias(c) if t in ("double", "float") else c 
    for c, t in sdf.dtypes
]).show()
+----+----+-------------------+
|   x|   y|                  z|
+----+----+-------------------+
| 1.0|null|2012-01-01 00:00:00|
|null| foo|               null|
+----+----+-------------------+