I am learning Apache Beam and trying to implement something similar to distcp. I use FileIO.read().filepattern() to get the input files, but while writing with FileIO.write, the files get coalesced sometimes.
Knowing the partition count before job execution is not possible.
PCollection<MatchResult.Metadata> pCollection = pipeline.apply(this.name(), FileIO.match().filepattern(path()))
.apply(FileIO.readMatches())
.apply(name(), FileIO.<FileIO.ReadableFile>write()
.via(FileSink.create())
.to(path()));
Code for Sink
@AutoValue
public abstract static class FileSink implements FileIO.Sink<FileIO.ReadableFile> {
private OutputStream outputStream;
public static FileSink create() {
return new AutoValue_FileIOOperator_FileSink();
}
@Override
public void open(WritableByteChannel channel) throws IOException {
outputStream = Channels.newOutputStream(channel);
}
@Override
public void write(FileIO.ReadableFile element) throws IOException {
try (final InputStream inputStream = Channels.newInputStream(element.open())) {
IOUtils.copy(inputStream, outputStream);
}
}
@Override
public void flush() throws IOException {
outputStream.flush();
}
}
You can use
FileIO.writeDynamic
and specify in.by
how you want to write them. For example, if you have unique keys you can use.by(KV::getKey)
and each key element will be written to a separate file. Otherwise, the criterion can be the hash of the row, etc. also you can tune.withNaming
at will. As a demo:This will write the four elements into four files:
Full code:
Let me know if that works with your custom sink, too.