Apache Beam PubSubIO with GroupByKey

2019-05-15 04:57发布

I'm trying with Apache Beam 2.1.0 to consume simple data (key,value) from google PubSub and group by key to be able to treat batches of data.

With default trigger my code after "GroupByKey" never fires (I waited 30min). If I defined custom trigger, code is executed but I would like to understand why default trigger is never fired. I tried to define my own timestamp with "withTimestampLabel" but same issue. I tried to change duration of windows but same issue too (1second, 10seconds, 30seconds etc).

I used command line for this test to insert data

gcloud beta pubsub topics publish test A,1
gcloud beta pubsub topics publish test A,2
gcloud beta pubsub topics publish test B,1
gcloud beta pubsub topics publish test B,2

From documentation it says that we can do one or the other but not necessarily both

If you are using unbounded PCollections, you must use either non-global windowing OR an aggregation trigger in order to perform a GroupByKey or CoGroupByKey

It looks to be similar to

  1. Consuming unbounded data in windows with default trigger
  2. Scio: groupByKey doesn't work when using Pub/Sub as collection source

My code

static class Compute extends DoFn<KV<String, Iterable<Integer>>, Void> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        // Code never fires
        System.out.println("KEY:" + c.element().getKey());
        System.out.println("NB:" + c.element().getValue().spliterator().getExactSizeIfKnown());
    }
}

public static void main(String[] args) {
    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());

    p.apply(PubsubIO.readStrings().fromSubscription("projects/" + args[0] + "/subscriptions/test"))
     .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
     .apply(
        MapElements
            .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
            .via((String row) -> {
                String[] parts = row.split(",");
                System.out.println(Arrays.toString(parts)); // Code fires
                return KV.of(parts[0], Integer.parseInt(parts[1]));
            })
     )
    .apply(GroupByKey.create())
    .apply(ParDo.of(new Compute()));

    p.run();
}

0条回答
登录 后发表回答