I am reading data from a bounded source, a csv file, in a batch pipeline and would like to assign a timestamp to the elements based on data stored as a column in the csv file. How do I do this in a Apache Beam pipeline?
可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
回答1:
If your batched source of data contains an event based timestamp per element, for example you have a click event which has the tuple {'timestamp, 'userid','ClickedSomething'}
. You can assign the timestamp to the element within a DoFn
in your pipeline.
Java:
public void process(ProcessContext c){
c.outputWithTimestamp(
c.element(),
new Instant(c.element().getTimestamp()));
}
Python:
'AddEventTimestamps' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
[Edit non-lambda Python Example from Beam guide:]
class AddTimestampDoFn(beam.DoFn):
def process(self, element):
# Extract the numeric Unix seconds-since-epoch timestamp to be
# associated with the current log entry.
unix_timestamp = extract_timestamp_from_log_entry(element)
# Wrap and emit the current entry and new timestamp in a
# TimestampedValue.
yield beam.window.TimestampedValue(element, unix_timestamp)
timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
[Edit As per Anton comment] More information can be found @
https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements