Can TextIO write to prefixes derived from the wind

2019-07-17 04:15发布

问题:

I am processing a windowed stream of PubSub messages and I would like to archive them to GCS. I'd like the archived files to have a prefix that's derived from the window timestamp (something like gs://bucket/messages/2015/01/messages-2015-01-01.json). Is this possible with TextIO.Write, or do I need to implement my own FileBasedSink?

回答1:

This can be done with the recently added feature for windowed writes in TextIO. Please see the documentation for TextIO, in particular see withWindowedWrites and to(FilenamePolicy). This feature is also present in AvroIO.



回答2:

Are you simply looking for the function TextIO.Write.Bound<String>.withSuffix() or TextIO.Write.Bound<String>.to()? It seems these would allow you to provide a suffix or prefix for the output filename.



回答3:

Right now, TextIO.Write does not support operation in streaming mode – writing to GCS is tricky, e.g., because you can't write to a file concurrently from multiple workers and you can't append to files once they close. We have plans to add streaming support to TextIO.

You'll get the best support for this today using BigQuery rather than GCS – because we already support BigQuery writes during streaming, and you choose which table you write to based on the window name, and BigQuery supports writes from many different workers at once.



回答4:

TextIO.Write ought to work. No need for custom filesink.

In your case, you want to write your PubSub messages to an output text file - not locally, but on remote GS. You ought to be able to use: PCollection .apply.TextIO.Write().to(

Since you are processing a stream of PubSub messages, your window is unbounded and your PubSub data source already provides a timestamp for each element in the PCollection.

If you wish to assign a timestamp, your ParDo transform needs to use a DoFn that outputs elements using ProcessContext.outputWithTimestamp().

In summary, you can use TextIO.Write aftre ensuring the elements in your PCollection are output with timestamp.