How to avoid duplicate columns after join?

2020-01-23 12:07发布

I have two dataframes with the following columns:

df1.columns
//  Array(ts, id, X1, X2)

and

df2.columns
//  Array(ts, id, Y1, Y2)

After I do

val df_combined = df1.join(df2, Seq(ts,id))

I end up with the following columns: Array(ts, id, X1, X2, ts, id, Y1, Y2). I could expect that the common columns would be dropped. Is there something that additional that needs to be done?

9条回答
ゆ 、 Hurt°
2楼-- · 2020-01-23 12:16

try this,

val df_combined = df1.join(df2, df1("ts") === df2("ts") && df1("id") === df2("id")).drop(df2("ts")).drop(df2("id"))
查看更多
够拽才男人
3楼-- · 2020-01-23 12:17

You can simply use this

df1.join(df2, Seq("ts","id"),"TYPE-OF-JOIN")

Here TYPE-OF-JOIN can be

  • left
  • right
  • inner
  • fullouter

For example, I have two dataframes like this:

// df1
word   count1
w1     10   
w2     15  
w3     20

// df2
word   count2
w1     100   
w2     150  
w5     200

If you do fullouter join then the result looks like this

df1.join(df2, Seq("word"),"fullouter").show()

word   count1  count2
w1     10      100
w2     15      150
w3     20      null
w5     null    200
查看更多
何必那么认真
4楼-- · 2020-01-23 12:20

Inner Join is default join in spark, Below is simple syntax for it.

leftDF.join(rightDF,"Common Col Nam")

For Other join you can follow the below syntax

leftDF.join(rightDF,Seq("Common Columns comma seperated","join type")

If columns Name are not common then

leftDF.join(rightDF,leftDF.col("x")===rightDF.col("y),"join type")
查看更多
戒情不戒烟
5楼-- · 2020-01-23 12:23

The simple answer (from the Databricks FAQ on this matter) is to perform the join where the joined columns are expressed as an array of strings (or one string) instead of a predicate.

Below is an example adapted from the Databricks FAQ but with two join columns in order to answer the original poster's question.

Here is the left dataframe:

val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10))

val left = llist.toDF("firstname","lastname","date","duration")

left.show()

/*
+---------+--------+----------+--------+
|firstname|lastname|      date|duration|
+---------+--------+----------+--------+
|      bob|       b|2015-01-13|       4|
|    alice|       a|2015-04-23|      10|
+---------+--------+----------+--------+
*/

Here is the right dataframe:

val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload")

right.show()

/*
+---------+--------+------+
|firstname|lastname|upload|
+---------+--------+------+
|    alice|       a|   100|
|      bob|       b|    23|
+---------+--------+------+
*/

Here is an incorrect solution, where the join columns are defined as the predicate left("firstname")===right("firstname") && left("lastname")===right("lastname").

The incorrect result is that the firstname and lastname columns are duplicated in the joined data frame:

left.join(right, left("firstname")===right("firstname") &&
                 left("lastname")===right("lastname")).show

/*
+---------+--------+----------+--------+---------+--------+------+
|firstname|lastname|      date|duration|firstname|lastname|upload|
+---------+--------+----------+--------+---------+--------+------+
|      bob|       b|2015-01-13|       4|      bob|       b|    23|
|    alice|       a|2015-04-23|      10|    alice|       a|   100|
+---------+--------+----------+--------+---------+--------+------+
*/

The correct solution is to define the join columns as an array of strings Seq("firstname", "lastname"). The output data frame does not have duplicated columns:

left.join(right, Seq("firstname", "lastname")).show

/*
+---------+--------+----------+--------+------+
|firstname|lastname|      date|duration|upload|
+---------+--------+----------+--------+------+
|      bob|       b|2015-01-13|       4|    23|
|    alice|       a|2015-04-23|      10|   100|
+---------+--------+----------+--------+------+
*/
查看更多
一夜七次
6楼-- · 2020-01-23 12:25

This is an expected behavior. DataFrame.join method is equivalent to SQL join like this

SELECT * FROM a JOIN b ON joinExprs

If you want to ignore duplicate columns just drop them or select columns of interest afterwards. If you want to disambiguate you can use access these using parent DataFrames:

val a: DataFrame = ???
val b: DataFrame = ???
val joinExprs: Column = ???

a.join(b, joinExprs).select(a("id"), b("foo"))
// drop equivalent 
a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo"))

or use aliases:

// As for now aliases don't work with drop
a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo")

For equi-joins there exist a special shortcut syntax which takes either a sequence of strings:

val usingColumns: Seq[String] = ???

a.join(b, usingColumns)

or as single string

val usingColumn: String = ???

a.join(b, usingColumn)

which keep only one copy of columns used in a join condition.

查看更多
Luminary・发光体
7楼-- · 2020-01-23 12:25

This is a normal behavior from SQL, what I am doing for this:

  • Drop or Rename source columns
  • Do the join
  • Drop renamed column if any

Here I am replacing "fullname" column:

Some code in Java:

this
    .sqlContext
    .read()
    .parquet(String.format("hdfs:///user/blablacar/data/year=%d/month=%d/day=%d", year, month, day))
    .drop("fullname")
    .registerTempTable("data_original");

this
    .sqlContext
    .read()
    .parquet(String.format("hdfs:///user/blablacar/data_v2/year=%d/month=%d/day=%d", year, month, day))
    .registerTempTable("data_v2");

 this
    .sqlContext
    .sql(etlQuery)
    .repartition(1)
    .write()
    .mode(SaveMode.Overwrite)
    .parquet(outputPath);

Where the query is:

SELECT
    d.*,
   concat_ws('_', product_name, product_module, name) AS fullname
FROM
    {table_source} d
LEFT OUTER JOIN
    {table_updates} u ON u.id = d.id

This is something you can do only with Spark I believe (drop column from list), very very helpful!

查看更多
登录 后发表回答