In a batch pipeline how do I assign timestamps to

2019-08-01 19:56发布

I am reading data from a bounded source, a csv file, in a batch pipeline and would like to assign a timestamp to the elements based on data stored as a column in the csv file. How do I do this in a Apache Beam pipeline?

1条回答
We Are One
2楼-- · 2019-08-01 20:33

If your batched source of data contains an event based timestamp per element, for example you have a click event which has the tuple {'timestamp, 'userid','ClickedSomething'}. You can assign the timestamp to the element within a DoFn in your pipeline.

Java:

public void process(ProcessContext c){
     c.outputWithTimestamp(
         c.element(), 
         new Instant(c.element().getTimestamp()));
}

Python:

'AddEventTimestamps' >> beam.Map(
            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))

[Edit non-lambda Python Example from Beam guide:]

class AddTimestampDoFn(beam.DoFn):

  def process(self, element):
    # Extract the numeric Unix seconds-since-epoch timestamp to be
    # associated with the current log entry.
    unix_timestamp = extract_timestamp_from_log_entry(element)
    # Wrap and emit the current entry and new timestamp in a
    # TimestampedValue.
    yield beam.window.TimestampedValue(element, unix_timestamp)

timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())

[Edit As per Anton comment] More information can be found @

https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements

查看更多
登录 后发表回答