How do I compare each column in a table using Data

2020-05-04 11:22发布

问题:

There are two tables; one is ID Table 1 and the other is Attribute Table 2.

Table 1

Table 2

If the IDs the same row in Table 1 has same attribrte, then we get number 1, else we get 0. Finally, we get the result Table 3.

Table 3

For example, id1 and id2 have different color and size, so the id1 and id2 row(2nd row in Table 3) has "id1 id2 0 0";

id1 and id3 have same color and different size, so the id1 and id3 row(3nd row in Table 3) has "id1 id3 1 0";

Same attribute---1 Different attribute---0

How can I get the result Table 3 using Scala dataframe?

回答1:

This should do the trick

import spark.implicits._

val t1 = List(
  ("id1","id2"),
  ("id1","id3"),
  ("id2","id3")
).toDF("id_x", "id_y")

val t2 = List(
  ("id1","blue","m"),
  ("id2","red","s"),
  ("id3","blue","s")
).toDF("id", "color", "size")

t1
  .join(t2.as("x"), $"id_x" === $"x.id", "inner")
  .join(t2.as("y"), $"id_y" === $"y.id", "inner")
  .select(
    'id_x,
    'id_y,
    when($"x.color" === $"y.color",1).otherwise(0).alias("color").cast(IntegerType),
    when($"x.size" === $"y.size",1).otherwise(0).alias("size").cast(IntegerType)
  )
  .show()

Resulting in:

+----+----+-----+----+
|id_x|id_y|color|size|
+----+----+-----+----+
| id1| id2|    0|   0|
| id1| id3|    1|   0|
| id2| id3|    0|   1|
+----+----+-----+----+


回答2:

Here is how you can do it using UDF which helps you to understand, how ever the repetition of code and be minimized to increase the performance

import spark.implicits._

val df1 = spark.sparkContext.parallelize(Seq(
    ("id1", "id2"),
    ("id1","id3"),
    ("id2","id3")
  )).toDF("idA", "idB")

val df2 = spark.sparkContext.parallelize(Seq(
  ("id1", "blue", "m"),
  ("id2", "red", "s"),
  ("id3", "blue", "s")
)).toDF("id", "color", "size")

val firstJoin = df1.join(df2, df1("idA") === df2("id"), "inner")
  .withColumnRenamed("color", "colorA")
  .withColumnRenamed("size", "sizeA")
  .withColumnRenamed("id", "idx")

val secondJoin = firstJoin.join(df2, firstJoin("idB") === df2("id"), "inner")

val check = udf((v1: String, v2:String ) => {
  if (v1.equalsIgnoreCase(v2)) 1 else 0
})

val result = secondJoin
  .withColumn("color", check(col("colorA"), col("color")))
  .withColumn("size", check(col("sizeA"), col("size")))

val finalResult = result.select("idA", "idB", "color", "size")

Hope this helps!