Is it possible to perform an action once a batch Dataflow job has finished processing all data? Specifically, I'd like to move the text file that the pipeline just processed to a different GCS bucket. I'm not sure where to place that in my pipeline to ensure it executes once after the data processing has completed.
相关问题
- Why do Dataflow steps not start?
- Apache beam DataFlow runner throwing setup error
- Apply Side input to BigQueryIO.read operation in A
- Reading BigQuery federated table as source in Data
- CloudDataflow can not use “google.cloud.datastore”
相关文章
- Kafka to Google Cloud Platform Dataflow ingestion
- How to run dynamic second query in google cloud da
- Beam/Google Cloud Dataflow ReadFromPubsub Missing
- Cloud Dataflow failure recovery
- KafkaIO checkpoint - how to commit offsets to Kafk
- Validating rows before inserting into BigQuery fro
- Can Dataflow sideInput be updated per window by re
- Computing GroupBy once then passing it to multiple
I think two options can help you here:
1) Use TextIO to write to the bucket or folder you want, specifying the exact GCS path (for e.g. gs://sandbox/other-bucket)
2) Use Object Change Notifications in combination with Cloud Functions. You can find a good primer on doing this here and the SDK for GCS in JS here. What you will do in this option is basically setting up a trigger when something drops in a certain bucket, and move it to another one using your self-written Cloud Function.
A little trick I got from reading the source code of apache beam's
PassThroughThenCleanup.java
.Right after your reader, create a side input that 'combine' the entire collection (in the source code, it is the
View.asIterable()
PTransform) and connect its output to aDoFn
. ThisDoFn
will be called only after the reader has finished reading ALL elements.P.S. The code literally name the operation,
cleanupSignalView
which I found really cleverNote that you can achieve the same effect using
Combine.globally()
(java) orbeam.CombineGlobally()
(python). For more info check out section 4.2.4.3 hereI don't see why you need to do this post pipeline execution. You could use side outputs to write the file to multiple buckets, and save yourself the copy after the pipeline finishes.
If that's not going to work for you (for whatever reason), then you can simply run your pipeline in blocking execution mode i.e. use
pipeline.run().waitUntilFinish()
, and then just write the rest of your code (which does the copy) after that.