join dataframes and perform operation

2019-08-06 13:51发布

问题:

Hello guys i have a dataframe that is being up to date each date , each day i need to add the new qte and the new ca to the old one and update the date . So i need to update the ones that are already existing and add the new ones.Here an example what i would like to have at the end

val histocaisse = spark.read
      .format("csv")
      .option("header", "true") //reading the headers
      .load("C:/Users/MHT/Desktop/histocaisse_dte1.csv")

    val hist = histocaisse
      .withColumn("pos_id", 'pos_id.cast(LongType))
      .withColumn("article_id", 'pos_id.cast(LongType))
      .withColumn("date", 'date.cast(DateType))
      .withColumn("qte", 'qte.cast(DoubleType))
      .withColumn("ca", 'ca.cast(DoubleType))



    val histocaisse2 = spark.read
      .format("csv")
      .option("header", "true") //reading the headers

      .load("C:/Users/MHT/Desktop/histocaisse_dte2.csv")

    val hist2 = histocaisse2.withColumn("pos_id", 'pos_id.cast(LongType))
      .withColumn("article_id", 'pos_id.cast(LongType))
      .withColumn("date", 'date.cast(DateType))
      .withColumn("qte", 'qte.cast(DoubleType))
      .withColumn("ca", 'ca.cast(DoubleType))
    hist2.show(false)

+------+----------+----------+----+----+
|pos_id|article_id|date      |qte |ca  |
+------+----------+----------+----+----+
|1     |1         |2000-01-07|2.5 |3.5 |
|2     |2         |2000-01-07|14.7|12.0|
|3     |3         |2000-01-07|3.5 |1.2 |
+------+----------+----------+----+----+

+------+----------+----------+----+----+
|pos_id|article_id|date      |qte |ca  |
+------+----------+----------+----+----+
|1     |1         |2000-01-08|2.5 |3.5 |
|2     |2         |2000-01-08|14.7|12.0|
|3     |3         |2000-01-08|3.5 |1.2 |
|4     |4         |2000-01-08|3.5 |1.2 |
|5     |5         |2000-01-08|14.5|1.2 |
|6     |6         |2000-01-08|2.0 |1.25|
+------+----------+----------+----+----+

+------+----------+----------+----+----+
|pos_id|article_id|date      |qte |ca  |
+------+----------+----------+----+----+
|1     |1         |2000-01-08|5.0 |7.0 |
|2     |2         |2000-01-08|39.4|24.0|
|3     |3         |2000-01-08|7.0 |2.4 |
|4     |4         |2000-01-08|3.5 |1.2 |
|5     |5         |2000-01-08|14.5|1.2 |
|6     |6         |2000-01-08|2.0 |1.25|
+------+----------+----------+----+----+

Here what i did

    val histoCombinaison2=hist2.join(hist,Seq("article_id","pos_id"),"left")
      .groupBy("article_id","pos_id").agg((hist2("qte")+hist("qte")) as ("qte"),(hist2("ca")+hist("ca")) as ("ca"),hist2("date"))

  histoCombinaison2.show()

and i got the following exception

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression '`qte`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:218)

回答1:

// import functions
import org.apache.spark.sql.functions.{coalesce, lit}

// we might not need groupBy, 
// since after join, all the information will be in the same row
// so instead of using aggregate function, we simply combine the related fields as a new column.
val df = hist2.join(hist1, Seq("article_id", "pos_id"), "left")
  .select($"pos_id", $"article_id",
    coalesce(hist2("date"), hist1("date")).alias("date"),
    (coalesce(hist2("qte"), lit(0)) + coalesce(hist1("qte"), lit(0))).alias("qte"),
    (coalesce(hist2("ca"), lit(0)) + coalesce(hist1("ca"), lit(0))).alias("ca"))
  .orderBy("pos_id", "article_id")

// df.show()
|pos_id|article_id|      date| qte|  ca|
+------+----------+----------+----+----+
|     1|         1|2000-01-08| 5.0| 7.0|
|     2|         2|2000-01-08|29.4|24.0|
|     3|         3|2000-01-08| 7.0| 2.4|
|     4|         4|2000-01-08| 3.5| 1.2|
|     5|         5|2000-01-08|14.5| 1.2|
|     6|         6|2000-01-08| 2.0|1.25|
+------+----------+----------+----+----+

Thanks.



回答2:

As I have mentioned your comment that you should define your schema and use it in reading csv to dataframe as

import sqlContext.implicits._

import org.apache.spark.sql.types._
val schema = StructType(Seq(
  StructField("pos_id", LongType, true),
  StructField("article_id", LongType, true),
  StructField("date", DateType, true),
  StructField("qte", LongType, true),
  StructField("ca", DoubleType, true)
))

val hist1 = sqlContext.read
  .format("csv")
  .option("header", "true")
  .schema(schema)
  .load("C:/Users/MHT/Desktop/histocaisse_dte1.csv")

hist1.show

val hist2 = sqlContext.read
  .format("csv")
  .option("header", "true") //reading the headers
  .schema(schema)
  .load("C:/Users/MHT/Desktop/histocaisse_dte2.csv")

hist2.show

Then you should use when function to define the logic you need to implement as

val df = hist2.join(hist1, Seq("article_id", "pos_id"), "left")
  .select($"pos_id", $"article_id",
    when(hist2("date").isNotNull, hist2("date")).otherwise(when(hist1("date").isNotNull, hist1("date")).otherwise(lit(null))).alias("date"),
    (when(hist2("qte").isNotNull, hist2("qte")).otherwise(lit(0)) + when(hist1("qte").isNotNull, hist1("qte")).otherwise(lit(0))).alias("qte"),
    (when(hist2("ca").isNotNull, hist2("ca")).otherwise(lit(0)) + when(hist1("ca").isNotNull, hist1("ca")).otherwise(lit(0))).alias("ca"))

I hope the answer is helpful