Thanks in advance!
[+] Issue:
I have a lot of files on google cloud, for every file I have to:
- get the file
- Make a bunch of Google-Cloud-Storage API calls on each file to index it(e.g. name = blob.name, size = blob.size)
- unzip it
- search for stuff in there
- put the indexing information + stuff found inside file in a BigQuery Table
I've been using python2.7 and the Google-Cloud-SDK. This takes hours if I run it linearly. I was suggested Apache Beam/DataFlow to process in parallel.
[+] What I've been able to do:
I can read from one file, perform a PTransform and write to another file.
def loadMyFile(pipeline, path):
return pipeline | "LOAD" >> beam.io.ReadFromText(path)
def myFilter(request):
return request
with beam.Pipeline(options=PipelineOptions()) as p:
data = loadMyFile(pipeline,path)
output = data | "FILTER" >> beam.Filter(myFilter)
output | "WRITE" >> beam.io.WriteToText(google_cloud_options.staging_location)
[+] What I want to do:
How can I load many of those files simultaneously, perform the same transform to them in parallel, then in parallel write to big query?
Diagram Of What I Wish to Perform
[+] What I've Read:
https://beam.apache.org/documentation/programming-guide/
http://enakai00.hatenablog.com/entry/2016/12/09/104913
Again, many thanks
textio
accepts a file_pattern.
From Python sdk:
file_pattern (str) – The file path to read from as a local file path or a GCS gs:// path. The path can contain glob characters
For example, suppose you have a bunch of *.txt
files in storage gs://my-bucket/files/
, you can say:
with beam.Pipeline(options=PipelineOptions()) as p:
(p
| "LOAD" >> beam.io.textio.ReadFromText(file_pattern="gs://my-bucket/files/*.txt")
| "FILTER" >> beam.Filter(myFilter)
| "WRITE" >> beam.io.textio.WriteToText(output_ocation)
If you somehow do have multiple PCollections
of the same type, you can also Flatten them into a single one
merged = (
(pcoll1, pcoll2, pcoll3)
# A list of tuples can be "piped" directly into a Flatten transform.
| beam.Flatten())
Ok so I resolved this by doing the following:
1) get the name of a bucket from somewhere | first PCollection
2) get a list of blobs from that bucket | second PCollection
3) do a FlatMap to get blobs individually from the list | third PCollection
4) do a ParDo that gets the metadata
5) write to BigQuery
my pipeline looks like this:
with beam.Pipeline(options=options) as pipe:
bucket = pipe | "GetBucketName" >> beam.io.ReadFromText('gs://example_bucket_eraseme/bucketName.txt')
listOfBlobs = bucket | "GetListOfBlobs" >> beam.ParDo(ExtractBlobs())
blob = listOfBlobs | "SplitBlobsIndividually" >> beam.FlatMap(lambda x: x)
dic = blob | "GetMetaData" >> beam.ParDo(ExtractMetadata())
dic | "WriteToBigQuery" >> beam.io.WriteToBigQuery(