Spark Dataframe distinguish columns with duplicate

2019-01-05 01:37发布

So as I know in Spark Dataframe, that for multiple columns can have the same name as shown in below dataframe snapshot:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]

Above result is created by join with a dataframe to itself, you can see there are 4 columns with both two a and f.

The problem is is there when I try to do more calculation with the a column, I cant find a way to select the a, I have try df[0] and df.select('a'), both returned me below error mesaage:

AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.

Is there anyway in Spark API that I can distinguish the columns from the duplicated names again? or maybe some way to let me change the column names?

7条回答
Animai°情兽
2楼-- · 2019-01-05 01:48

I would recommend that you change the column names for your join

df1.select('a as "df1_a", 'f as "df1_f")
   .join(df2.select('a as "df2_a", 'f as "df2_f"), 'df1_a === 'df2_a)

The resulting DataFrame will have schema

(df1_a, df1_f, df2_a, df2_f)
查看更多
成全新的幸福
3楼-- · 2019-01-05 01:48

After digging into the Spark API, I found I can first use alias to create a alias for the original dataframe then use withColumnRename to manually rename every column on the alias, at last to do the join without causing the column name duplication.

More detail can be refer to below Spark Dataframe API:

pyspark.sql.DataFrame.alias

pyspark.sql.DataFrame.withColumnRenamed

However, I think this is only a troublesome workaround, and wondering if there is any better way for my question.

查看更多
别忘想泡老子
4楼-- · 2019-01-05 01:55

This is how we can join two Dataframes on same column names in PySpark.

df = df1.join(df2, ['col1','col2','col3'])

If you do printSchema() after this then you can see that duplicate columns have been removed.

查看更多
Anthone
5楼-- · 2019-01-05 01:57

There is a simpler way than writing aliases for all of the columns you are joining on by doing:

df1.join(df2,['a'])

This works if the key that you are joining on is the same in both tables.

See https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

查看更多
Summer. ? 凉城
6楼-- · 2019-01-05 02:03

Lets start with some data:

from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row

df1 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=125231, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])

df2 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])

There are a few ways you can approach this problem. First of all you can unambiguously reference child table columns using parent columns:

df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

You can also use table aliases:

from pyspark.sql.functions import col

df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")

df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

Finally you can programmatically rename columns:

df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df1.select(*(col(x).alias(x + '_df2') for x in df2.columns))

df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)

## +--------------------+
## |               f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
查看更多
啃猪蹄的小仙女
7楼-- · 2019-01-05 02:04

Suppose the DataFrames you want to join are df1 and df2, and you are joining them on column 'a', then you have 2 methods

Method 1

df1.join(df2,'a','left_outer')

This is an awsome method and it is highly recommended.

Method 2

df1.join(df2,df1.a == df2.a,'left_outer').drop(df2.a)

查看更多
登录 后发表回答