Write each row received over PubSub to its own fil

2019-01-29 04:51发布

I am receiving messages via pubsub. Each message should be stored in its own file in GCS as rough data, execute some processing on the data, and then save it to big query- having the file name in the data.

Data should be seen immediately in BQ after received.

Example:

data published to pubsub : {a:1, b:2} 
data saved to GCS file UUID: A1F432 
data processing :  {a:1, b:2} -> 
                   {a:11, b: 22} -> 
                   {fileName: A1F432, data: {a:11, b: 22}} 
data in BQ : {fileName: A1F432, data: {a:11, b: 22}} 

the idea is that the processed data is stored in BQ having a link to the Rough data stored in GCS.

Here is my code.

public class BotPipline {

public static void main(String[] args) {

    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    options.setRunner(BlockingDataflowPipelineRunner.class);
    options.setProject(MY_PROJECT);
    options.setStagingLocation(MY_STAGING_LOCATION);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> input = pipeline.apply(PubsubIO.Read.subscription(MY_SUBSCRIBTION));

    String uuid = ...;
    input.apply(TextIO.Write.to(MY_STORAGE_LOCATION + uuid));

    input
    .apply(ParDo.of(new DoFn<String,String>(){..}).named("updateJsonAndInsertUUID"))
    .apply(convertToTableRow(...)).named("convertJsonStringToTableRow"))
            .apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
    );
    pipeline.run();
}

My code doesn't run since writing unbounded collections in TextIO.Write is not supported. After some research I found I have a few options to workaround this issue:

  1. creating a custom Sink in data flow
  2. implement writing to GCS as my own DoFn
  3. access the window of the data using the optional BoundedWindow

I have no idea how to start. Can anyone provide me code for one of the following solutions, or give me a different solution which matches my case. (providing the code)

1条回答
Rolldiameter
2楼-- · 2019-01-29 05:01

The best option is #2 - a simple DoFn that creates the files according to your data. Something like this:

class CreateFileFn extends DoFn<String, Void> {
  @ProcessElement
  public void process(ProcessContext c) throws IOException {
    String filename = ...generate filename from element...;
    try (WritableByteChannel channel = FileSystems.create(
            FileSystems.matchNewResource(filename, false),
            "application/text-plain")) {
      OutputStream out = Channels.newOutputStream(channel);
      ...write the element to out...
    }
  }
}
查看更多
登录 后发表回答