How to notify when DataFlow Job is complete

2019-08-15 04:48发布

问题:

I want to know on GAE when dataflow job is completed.

I tries to make the following both pipeline

1.

 | 'write to bigquery' >> beam.io.WriteToBigQuery(...)
 | WriteStringsToPubSub('projects/fakeprj/topics/a_topic')

2.

 | 'write to bigquery' >> beam.io.WriteToBigQuery(...)
 | 'DoPubSub' >> beam.ParDo(DoPubSub())   # do Publish using google.cloud.pubsub

But the both above code produces the following error:

AttributeError: 'PDone' object has no attribute 'windowing'

How to do procedure after WriteToBigquery?

note: I execute dataflow using template via REST. So, cannnot use pipeline_result.wait_until_finish().

Edit.

Full stack is here.

File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 327, in <module>
   vital_data_export()
 File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 323, in vital_data_export
   result = p.run()
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 382, in run
   return self.runner.run_pipeline(self)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 285, in run_pipeline
   return_context=True)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 580, in to_runner_api
   root_transform_id = context.transforms.get_id(self._root_transform())
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 810, in to_runner_api
   for part in self.parts],
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in to_runner_api
   for tag, out in self.named_outputs().items()},
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in <dictcomp>
   for tag, out in self.named_outputs().items()},
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 144, in to_runner_api
   self.windowing))
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 128, in windowing
   self.producer.inputs)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\transforms\ptransform.py", line 443, in get_windowing
   return inputs[0].windowing
AttributeError: 'PDone' object has no attribute 'windowing'

回答1:

YOU CANNOT

It's evident that PDone is the last stage of your pipeline and applying wait for done for this is not necessary.

PInput and PDone are classes supported by Apache Beam which indicates source and sink respectively. If you are trying to execute something after BigQuery write, it is not possible unless you run two different dataflow jobs in series.

If you are looking for running them in series, checkout Apache Airflow.



回答2:

In java this is what I did to publish a "done" event to PubSub at the end of a dataflow pipeline, where the output of the pipeline is writing to BigQuery. Hopefully there's equivalent in Python..

PCollection<TableRow> rows = data.apply("ConvertToTableRow", ParDo.of(new ConvertToRow()));
// Normally this would be the end of the pipeline..
WriteResult writeResult = rows.apply("WriteToBQ", BigQueryIO.writeTableRows().to(...);
// Transformations after this will be done AFTER all rows have been written to BQ
rows.apply(Wait.on(writeResult.getFailedInserts()))
    // Transforms each row inserted to an Integer of value 1
    .apply("OnePerInsertedRow", ParDo.of(new DoFn<TableRow, Integer>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output(Integer.valueOf(1));
        }
    }))
    // https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java#L51
    // Combines a PCollection of Integers (all 1's) by summing them. 
    // Outputs a PCollection of one integer element with the sum
    .apply("SumInsertedCounts", Sum.integersGlobally())
    .apply("CountsMessage", ParDo.of(new DoFn<Integer, PubsubMessage>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String messagePayload = "pipeline_completed";
            Map<String, String> attributes = new HashMap<>();
            attributes.put("rows_written", c.element().toString());
            PubsubMessage message = new PubsubMessage(messagePayload.getBytes(), attributes);
            c.output(message);
        }
    }))
    .apply("PublishCompletionMessage", PubsubIO.writeMessages().to(/* output topic */));