Perform action after Dataflow pipeline has process

2020-04-09 04:32发布

问题:

Is it possible to perform an action once a batch Dataflow job has finished processing all data? Specifically, I'd like to move the text file that the pipeline just processed to a different GCS bucket. I'm not sure where to place that in my pipeline to ensure it executes once after the data processing has completed.

回答1:

I don't see why you need to do this post pipeline execution. You could use side outputs to write the file to multiple buckets, and save yourself the copy after the pipeline finishes.

If that's not going to work for you (for whatever reason), then you can simply run your pipeline in blocking execution mode i.e. use pipeline.run().waitUntilFinish(), and then just write the rest of your code (which does the copy) after that.

[..]
/do some stuff before the pipeline runs
Pipeline pipeline = ...
pipeline.run().waitUntilFinish();
//do something after the pipeline finishes here
[..]


回答2:

A little trick I got from reading the source code of apache beam's PassThroughThenCleanup.java.

Right after your reader, create a side input that 'combine' the entire collection (in the source code, it is the View.asIterable() PTransform) and connect its output to a DoFn. This DoFn will be called only after the reader has finished reading ALL elements.

P.S. The code literally name the operation, cleanupSignalView which I found really clever

Note that you can achieve the same effect using Combine.globally() (java) or beam.CombineGlobally() (python). For more info check out section 4.2.4.3 here



回答3:

I think two options can help you here:

1) Use TextIO to write to the bucket or folder you want, specifying the exact GCS path (for e.g. gs://sandbox/other-bucket)

2) Use Object Change Notifications in combination with Cloud Functions. You can find a good primer on doing this here and the SDK for GCS in JS here. What you will do in this option is basically setting up a trigger when something drops in a certain bucket, and move it to another one using your self-written Cloud Function.