Resuming what I'm looking for to do with Apache Beam in Google Dataflow is something like LAG in the Azure Stream Analytics
Using a window of X minutes where I'm receiving data:
|||||| |||||| |||||| |||||| |||||| ||||||
| 1 | | 2 | | 3 | | 4 | | 5 | | 6 |
|id=x| |id=x| |id=x| |id=x| |id=x| |id=x|
|||||| ,|||||| ,|||||| ,|||||| ,|||||| ,|||||| , ...
I need to compare the data(n) with data(n-1), for example, following with the previous example, it will be something like this:
if data(6) inside and data(5) outside then ...
if data(5) inside and data(4) outside then ...
if data(4) inside and data(3) outside then ...
if data(3) inside and data(2) outside then ...
if data(2) inside and data(1) outside then ...
Is there any "practical "way to do this?
With Beam, as explained in the docs, state is maintained per key and window. Therefore, you can't access values from previous windows.
To do what you want to do you might need a more complex pipeline design. My idea, developed here as an example, would be to duplicate your messages in a ParDo:
- Emitting them unmodified to the main output
- At the same time, send them to a side output with a one-window lag
To do the second bullet point, we can add the duration of a window (WINDOW_SECONDS
) to the element timestamp:
class DuplicateWithLagDoFn(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
# Main output gets unmodified element
yield element
# The same element is emitted to the side output with a 1-window lag added to timestamp
yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))
We call the function specifying the correct tags:
beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')
and then apply the same windowing scheme to both, co-group by key, etc.
windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()
Finally, we can have both values (old and new) inside the same ParDo:
class CompareDoFn(beam.DoFn):
def process(self, element):
logging.info("Combined with previous vale: {}".format(element))
try:
old_value = int(element[1][1][0].split(',')[1])
except:
old_value = 0
try:
new_value = int(element[1][0][0].split(',')[1])
except:
new_value = 0
logging.info("New value: {}, Old value: {}, Difference: {}".format(new_value, old_value, new_value - old_value))
return (element[0], new_value - old_value)
To test this I run the pipeline with the direct runner and, on a separate shell, I publish two messages more than 10 seconds apart (in my case WINDOW_SECONDS
was 10s):
gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"
And the job output shows the expected difference:
INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40
Full code for my example here. Take into account performance considerations as you are duplicating elements but it makes sense if you need to have values available during two windows.