Dynamically select multiple columns while joining

2019-06-06 16:22发布

问题:

I have two spark data frame df1 and df2. Is there a way for selecting output columns dynamically while joining these two dataframes? The below definition outputs all column from df1 and df2 in case of inner join.

def joinDF (df1: DataFrame,  df2: DataFrame , joinExprs: Column, joinType: String): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExprs, joinType)
  dfJoinResult
  //.select()
}

Input data:

val df1 = List(("1","new","current"), ("2","closed","saving"), ("3","blocked","credit")).toDF("id","type","account")
val df2 = List(("1","7"), ("2","5"), ("5","8")).toDF("id","value")

Expected result:

val dfJoinResult = df1
  .join(df2, df1("id") === df2("id"), "inner")
  .select(df1("type"), df1("account"), df2("value")) 

dfJoinResult.schema():

StructType(StructField(type,StringType,true), 
StructField(account,StringType,true), 
StructField(value,StringType,true))

I have looked at options like df.select(cols.head, cols.tail: _*) but it does not allow to select columns from both DF's. Is there a way to pass selectExpr columns dynamically along with dataframe details that we want to select it from in my def? I'm using Spark 2.2.0.

回答1:

It is possible to pass the select expression as a Seq[Column] to the method:

def joinDF(df1: DataFrame,  df2: DataFrame , joinExpr: Column, joinType: String, selectExpr: Seq[Column]): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExpr, joinType)
  dfJoinResult.select(selectExpr:_*)
}

To call the method use:

val joinExpr = df1.col("id") === df2.col("id")
val selectExpr = Seq(df1.col("type"), df1.col("account"), df2.col("value"))

val testDf = joinDF(df1, df2, joinExpr, "inner", selectExpr)

This will give the desired result:

+------+-------+-----+
|  type|account|value|
+------+-------+-----+
|   new|current|    7|
|closed| saving|    5|
+------+-------+-----+

In the selectExpr above, it is necessary to specify which dataframe the columns are coming from. However, this can be further simplified if the following assumptions are true:

  1. The columns to join on have the same name in both dataframes
  2. The columns to be selected have unique names (the other dataframe do not have a column with the same name)

In this case, the joinExpr: Column can be changed to joinExpr: Seq[String] and selectExpr: Seq[Column] to selectExpr: Seq[String]:

def joinDF(df1: DataFrame,  df2: DataFrame , joinExpr: Seq[String], joinType: String, selectExpr: Seq[String]): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExpr, joinType)
  dfJoinResult.select(selectExpr.head, selectExpr.tail:_*)
}

Calling the method now looks cleaner:

val joinExpr = Seq("id")
val selectExpr = Seq("type", "account", "value")

val testDf = joinDF(df1, df2, joinExpr, "inner", selectExpr)

Note: When the join is performed using a Seq[String] the column names of the resulting dataframe will be different as compared to using an expression. When there are columns with the same name present, there will be no way to separately select these afterwards.



回答2:

A slightly modified solution from the one given above is before performing join, select the required columns from the DataFrames beforehand as it will have a little less overhead as there will be lesser no of columns to perform JOIN operation.

val dfJoinResult = df1.select("column1","column2").join(df2.select("col1"),joinExpr,joinType)

But remember to select the columns on which you will be performing the join operations as it will first select the columns and then from the available data will from join operation.