Joining rows in Apache Beam

2019-08-21 10:34发布

问题:

I'm having trouble understanding if the joins in Apache Beam (e.g. http://www.waitingforcode.com/apache-beam/joins-apache-beam/read) can join entire rows.

For example:

I have 2 datasets, in CSV format, where the first rows are column headers.

The first:

a,b,c,d
1,2,3,4
5,6,7,8
1,2,5,4

The second:

c,d,e,f
3,4,9,10

I want to left join on columns c and d so that I end up with:

a,b,c,d,e,f
1,2,3,4,9,10
5,6,7,8,,
1,2,5,4,,

However all the documentation on Apache Beam seems to say the PCollection objects need to be of type KV<K, V> when joining, so I have broken down my PCollection objects to a collection of KV<String, String> objects (where the key is the column header, and the value is row value). But in that case (where you just have a key with a value) I don't see how the row format can be maintained. How would KV(c,7) "know" that KV(a,5) is from the same row? Is Join meant for this sort of thing at all?

My code so far:

PCollection<KV<String, String>> flightOutput = ...;
PCollection<KV<String, String>> arrivalWeatherDataForJoin = ...;
PCollection<KV<String, KV<String, String>>> output = Join.leftOuterJoin(flightOutput, arrivalWeatherDataForJoin, "");

回答1:

Yes, Join is the utility class to help with joins like yours. It is a wrapper around CoGropByKey, see the corresponding section in the docs. The implementation of it is pretty short. Its tests might also have helpful examples.

Problem in your case is likely caused by how you're choosing the keys.

The KeyT int KV<KeyT,V1> in the Join library represents the key which you are using to match the records, it contains all the join fields. So in your case you will probably need to assign keys something like this (pseudocode):

pCollection1:

    Key     Value
   (3,4)  (1,2,3,4)
   (7,8)  (5,6,7,8)
   (5,4)  (1,2,5,4)

pCollection2:

    Key     Value
   (3,4)  (3,4,9,10)

And what will come of the join will look something like this (pseudocode):

joinResultPCollection:

   Key              Value
  (3,4)      (1,2,3,4),(3,4,9,10)
  (7,8)      (5,6,7,8),nullValue
  (5,4)      (1,2,5,4),nullValue

So you will probably need to add another transform after join to actually merge the left and right side into a combined row.

Because you have a CSV, you probably could use actual strings like "3,4" as keys (and values). Or you could use Lists<> or your custom row types.

For example, this is exactly what Beam SQL Join implementation does.