Someone know how to get Filename when using file pattern match in google-cloud-dataflow?
I'm newbee to use dataflow. How to get filename when use file patten match, in this way.
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*.txt"))
I'd like to how I detect filename that kinglear.txt,Hamlet.txt, etc.
I also had the 100 input files = 100 nodes on the dataflow diagram when using code similar to @danvk. I switched to an approach like this which resulted in all the reads being combined into a single block that you can expand to drill down into each file/directory that was read. The job also ran faster using this approach rather than the Lists.transform approach in our use case.
If you would like to simply expand the filepattern and get a list of filenames matching it, you can use
GcsIoChannelFactory.match("gs://dataflow-samples/shakespeare/*.txt")
(see GcsIoChannelFactory).If you would like to access the "current filename" from inside one of the DoFn's downstream in your pipeline - that is currently not supported (though there are some workarounds - see below). It is a common feature request and we are still thinking how best to fit it into the framework in a natural, generic and high-performant way.
Some workarounds include:
This has the downside that dynamic work rebalancing features won't work particularly well, because they currently apply at the level of Read PTransform's only, but not at the level of ParDo's with high fan-out (like the one here, which would read a file and produce all records); and parallelization will only work to the level of files but files will not be split into sub-ranges. At the scale of reading Shakespeare this is not an issue, but if you are reading a set of files of wildly different size, some extremely large, then it may become an issue.
FileBasedSource
(javadoc, general documentation) which would return records of type something likePair<String, T>
where theString
is the filename and theT
is the record you're reading. In this case the framework would handle the filepattern matching for you, dynamic work rebalancing would work just fine, however it is up to you to write the reading logic in yourFileBasedReader
.Both of these work-arounds are non-ideal, but depending on your requirements, one of them may do the trick for you.
One approach is to build a
List<PCollection>
where each entry corresponds to an input file, then useFlatten
. For example, if you want to parse each line of a collection of files into aFoo
object, you might do something like this:One downside of this approach is that, if you have 100 input files, you'll also have 100 nodes in the Cloud Dataflow monitoring console. This makes it hard to tell what's going on. I'd be interested in hearing from the Google Cloud Dataflow people whether this approach is efficient.