Apache Beam: How To Simultaneously Create Many PCo

2019-07-28 10:33发布

Thanks in advance!

[+] Issue:

I have a lot of files on google cloud, for every file I have to:

  1. get the file
  2. Make a bunch of Google-Cloud-Storage API calls on each file to index it(e.g. name = blob.name, size = blob.size)
  3. unzip it
  4. search for stuff in there
  5. put the indexing information + stuff found inside file in a BigQuery Table

I've been using python2.7 and the Google-Cloud-SDK. This takes hours if I run it linearly. I was suggested Apache Beam/DataFlow to process in parallel.

[+] What I've been able to do:

I can read from one file, perform a PTransform and write to another file.

def loadMyFile(pipeline, path):
    return pipeline | "LOAD" >> beam.io.ReadFromText(path)

def myFilter(request):
    return request

with beam.Pipeline(options=PipelineOptions()) as p:
    data = loadMyFile(pipeline,path)
    output = data | "FILTER" >> beam.Filter(myFilter)
    output | "WRITE" >> beam.io.WriteToText(google_cloud_options.staging_location)

[+] What I want to do:

How can I load many of those files simultaneously, perform the same transform to them in parallel, then in parallel write to big query?

Diagram Of What I Wish to Perform

[+] What I've Read:

https://beam.apache.org/documentation/programming-guide/ http://enakai00.hatenablog.com/entry/2016/12/09/104913

Again, many thanks

2条回答
2楼-- · 2019-07-28 11:03

textio accepts a file_pattern.

From Python sdk:

file_pattern (str) – The file path to read from as a local file path or a GCS gs:// path. The path can contain glob characters

For example, suppose you have a bunch of *.txt files in storage gs://my-bucket/files/, you can say:

with beam.Pipeline(options=PipelineOptions()) as p:
  (p 
  | "LOAD" >> beam.io.textio.ReadFromText(file_pattern="gs://my-bucket/files/*.txt")
  | "FILTER" >> beam.Filter(myFilter)
  | "WRITE" >> beam.io.textio.WriteToText(output_ocation)

If you somehow do have multiple PCollections of the same type, you can also Flatten them into a single one

merged = (
  (pcoll1, pcoll2, pcoll3)
  # A list of tuples can be "piped" directly into a Flatten transform.
  | beam.Flatten())
查看更多
疯言疯语
3楼-- · 2019-07-28 11:06

Ok so I resolved this by doing the following:

1) get the name of a bucket from somewhere | first PCollection

2) get a list of blobs from that bucket | second PCollection

3) do a FlatMap to get blobs individually from the list | third PCollection

4) do a ParDo that gets the metadata

5) write to BigQuery

my pipeline looks like this:

    with beam.Pipeline(options=options) as pipe:
        bucket      = pipe    | "GetBucketName"            >> beam.io.ReadFromText('gs://example_bucket_eraseme/bucketName.txt')
    listOfBlobs = bucket      | "GetListOfBlobs"           >> beam.ParDo(ExtractBlobs())
    blob        = listOfBlobs | "SplitBlobsIndividually"   >> beam.FlatMap(lambda x: x)
    dic         = blob        | "GetMetaData"              >> beam.ParDo(ExtractMetadata())
    dic                       | "WriteToBigQuery"          >> beam.io.WriteToBigQuery(
查看更多
登录 后发表回答