TextIO.Read GCS folders into pipeline with past 30

2019-09-15 19:28发布

问题:

I want to read a rolling window of past 30 days into my pipeline for e.g. on Jan 15 2017, I want to read:

> gs://bucket/20170115/* 
> gs://bucket/20170114/*
>. 
>.
>.
> gs://bucket/20161216/*

This says ("*", "?", "[..]") glob patterns are supported

Similar question, but with no good example

I am trying to avoid doing 30 Text.IO.Read steps then Flattening all Pcollections into one, this causes hot shards in the pipeline.

回答1:

When reading files from GCS, TextIO supports the same wildcard patterns as GCS, described here: Wildcard Names.

In the answer for the question you linked, bullet #2 suggests forming a small number of globs to represent your full range:

for example the two character range "23 through 67" is 2[3-] plus [3-5][0-9] plus 6[0-7]


TextIO also has a new API readAll() which allows you to specify input files dynamically as data. This allows you to pass in the exact set of filenames you need:

private static List<String> generate30DayFileGlobs(DateTime now) {
  // ..
}

public static void main() {
  Pipeline p = // ..

  p.apply(Create.<String>of(generate30DayFileGlobs(DateTime.now())));
   .apply(TextIO.readAll());

  // ..
}

The new TextIO.readAll() API has not yet been released, but you can build from master by specifying the Beam artifact version 2.2.0-SNAPSHOT. The 2.2.0 release is in progress and should be available sometime in September.



回答2:

One Glob Pattern generation function here