尝试应用lambda来创建新的列时“‘数据帧’对象有没有属性‘应用’”(“'DataFram

2019-11-04 20:01发布

我的目标是在一个数据帧大熊猫添加一个新列,但我面临的一个奇怪的错误。

新列预计将来自现有列的变换,可以做在字典/ HashMap中做一个查找。

# Loading data
df = sqlContext.read.format(...).load(train_df_path)

# Instanciating the map
some_map = {
    'a': 0, 
    'b': 1,
    'c': 1,
}

# Creating a new column using the map
df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1)

这导致了以下错误:

AttributeErrorTraceback (most recent call last)
<ipython-input-12-aeee412b10bf> in <module>()
     25 df= train_df
     26 
---> 27 df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1)

/usr/lib/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name)
    962         if name not in self.columns:
    963             raise AttributeError(
--> 964                 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
    965         jc = self._jdf.apply(name)
    966         return Column(jc)

AttributeError: 'DataFrame' object has no attribute 'apply'

其它潜在有用的信息:*我使用的星火和Python 2。

Answer 1:

您正在使用的语法是一个pandas数据帧。 为了实现这一目标的spark数据框,你应该使用withColumn()方法。 这个伟大的工程适用范围广的良好定义的数据帧的功能 ,但它是用户定义的映射函数稍微复杂一点。

一般情况

为了定义一个udf ,你需要指定输出的数据类型。 举例来说,如果你想申请一个函数my_func该返回一个string ,你可以创建一个udf ,如下所示:

import pyspark.sql.functions as f
my_udf = f.udf(my_func, StringType())

然后你可以使用my_udf创建一个新的列,如:

df = df.withColumn('new_column', my_udf(f.col("some_column_name")))

另一种选择是使用select

df = df.select("*", my_udf(f.col("some_column_name")).alias("new_column"))

具体问题

使用udf

在特定情况下,要使用字典翻译您的数据框的值。

这里有一个方法来定义一个udf用于此目的:

some_map_udf = f.udf(lambda x: some_map.get(x, None), IntegerType())

请注意,我用dict.get()因为你希望你的udf是稳健的坏输入。

df = df.withColumn('new_column', some_map_udf(f.col("some_column_name")))

使用数据框功能

有时使用udf是不可避免的,但尽可能使用数据帧的功能通常是优选的。

这里是一个选项,以做同样的事情,而不使用udf

关键是要在项目迭代中some_map创建的列表pyspark.sql.functions.when()函数。

some_map_func = [f.when(f.col("some_column_name") == k, v) for k, v in some_map.items()]
print(some_map_func)
#[Column<CASE WHEN (some_column_name = a) THEN 0 END>,
# Column<CASE WHEN (some_column_name = c) THEN 1 END>,
# Column<CASE WHEN (some_column_name = b) THEN 1 END>]

现在你可以使用pyspark.sql.functions.coalesce()一个选择内:

df = df.select("*", f.coalesce(*some_map_func).alias("some_column_name"))

这工作,因为when()返回null在默认情况下,如果条件不满足,和coalesce()会选择它遇到的第一个非空值。 由于地图的钥匙都是独一无二的,顶多一列将非空。



Answer 2:

你有一个火花数据帧,而不是熊猫数据帧。 要将新列添加到数据帧的火花:

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
df = df.withColumn('new_column', F.udf(some_map.get, IntegerType())(some_column_name))
df.show()


文章来源: “'DataFrame' object has no attribute 'apply'” when trying to apply lambda to create new column