I have seen this question answered before on stack overflow (https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow), but not since apache beam has added splittable dofn functionality for python. How would I access the filename of the current file being processed when passing in a file pattern to a gcs bucket?
I want to pass the filename into my transform function:
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText('gs://url to file')
data = (
lines
| 'Jsonify' >> beam.Map(jsonify)
| 'Unnest' >> beam.FlatMap(unnest)
| 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
'project_id:dataset_id.table_name', schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
Ultimately, what I want to do is pass the filename into my transform function when I transform each row of the json (see this and then use the filename to do a lookup in a different BQ table to get a value). I think once I manage to know how to get the filename I will be able to figure out the side input part in order to do the lookup in the bq table and get the unique value.
I tried to implement a solution with the previously cited case. There, as well as in other approaches such as this one they also get a list of file names but load all the file into a single element which might not scale well with large files. Therefore, I looked into adding the filename to each record.
As input I used two csv files:
Using
GCSFileSystem.match
we can accessmetadata_list
to retrieve FileMetadata containing the file path and size in bytes. In my example:The code is:
We will read each of the matching files into a different PCollection. As we don't know the number of files a priori we need to create programmatically a list of names for each PCollection
(p0, p1, ..., pN-1)
and ensure that we have unique labels for each step('Read file 0', 'Read file 1', etc.)
:Then we proceed to read each different file into its corresponding PCollection with
ReadFromText
and then we call theAddFilenamesFn
ParDo to associate each record with the filename.where
AddFilenamesFn
is:My first approach was using a Map function directly which results in simpler code. However,
result[i].path
was resolved at the end of the loop and each record was incorrectly mapped to the last file of the list:Finally, we flatten all the PCollections into one:
and we check the results by logging the elements:
I tested this with both
DirectRunner
andDataflowRunner
for Python SDK 2.8.0.I hope this addresses the main issue here and you can continue by integrating BigQuery into your full use case now. You might need to use the Python Client Library for that, I wrote a similar Java example.
Full code: