Long lived state with Google Dataflow

2019-01-24 16:26发布

Just trying to get my head around the programming model here. Scenario is I'm using Pub/Sub + Dataflow to instrument analytics for a web forum. I have a stream of data coming from Pub/Sub that looks like:

ID | TS | EventType
1  | 1  | Create
1  | 2  | Comment
2  | 2  | Create
1  | 4  | Comment

And I want to end up with a stream coming from Dataflow that looks like:

ID | TS | num_comments
1  | 1  | 0
1  | 2  | 1
2  | 2  | 0
1  | 4  | 2

I want the job that does this rollup to run as a stream process, with new counts being populated as new events come in. My question is, where is the idiomatic place for the job to store the state for the current topic id and comment counts? Assuming that topics can live for years. Current ideas are:

  • Write a 'current' entry for the topic id to BigTable and in a DoFn query what the current comment count for the topic id is coming in. Even as I write this I'm not a fan.
  • Use side inputs somehow? It seems like maybe this is the answer, but if so I'm not totally understanding.
  • Set up a streaming job with a global window, with a trigger that goes off every time it gets a record, and rely on Dataflow to keep the entire pane history somewhere. (unbounded storage requirement?)

EDIT: Just to clarify, I wouldn't have any trouble implementing any of these three strategies, or a million different other ways of doing it, I'm more interested in what is the best way of doing it with Dataflow. What will be most resilient to failure, having to re-process history for a backfill, etc etc.

EDIT2: There is currently a bug with the dataflow service where updates fail if adding inputs to a flatten transformation, which will mean you'll need to discard and rebuild any state accrued in the job if you make a change to a job that includes adding something to a flatten operation.

2条回答
贪生不怕死
2楼-- · 2019-01-24 16:58

You should be able to use triggers and a combine to accomplish this.

PCollection<ID> comments = /* IDs from the source */;
PCollection<KV<ID, Long>> commentCounts = comments
    // Produce speculative results by triggering as data comes in.
    // Note that this won't trigger after *every* element, but it will
    // trigger relatively quickly (as the system divides incoming data
    // into work units). You could also throttle this with something
    // like:
    //   AfterProcessingTime.pastFirstElementInPane()
    //     .plusDelayOf(Duration.standardMinutes(5))
    // which will produce output every 5 minutes
    .apply(Window.triggering(
            Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
        .accumulatingFiredPanes())
    // Count the occurrences of each ID
    .apply(Count.perElement());

// Produce an output String -- in your use case you'd want to produce
// a row and write it to the appropriate source  
commentCounts.apply(new DoFn<KV<ID, Long>, String>() {
  public void processElement(ProcessContext c) {
    KV<ID, Long> element = c.element();
    // This includes details about the pane of the window being
    // processed, and including a strictly increasing index of the
    // number of panes that have been produced for the key.        
    PaneInfo pane = c.pane();
    return element.key() + " | " + pane.getIndex() + " | " + element.value(); 
  }
});

Depending on your data, you could also read whole comments from the source, extract the ID, and then use Count.perKey() to get the counts for each ID. If you want a more complicated combination, you could look at defining a custom CombineFn and using Combine.perKey.

查看更多
【Aperson】
3楼-- · 2019-01-24 16:59

Since BigQuery does not support overwriting rows, one way to go about this is to write the events to BigQuery, and query the data using COUNT:

SELECT ID, COUNT(num_comments) from Table GROUP BY ID;

You can also do per-window aggregations of num_comments within Dataflow before writing the entries to BigQuery; the query above will continue to work.

查看更多
登录 后发表回答