I am reading multiple .gz file to process using google dataflow. Final destination of data is BigQuery. BigQuery table has dedicated columns for each columns in csv file within .gz file. There is one additional column in BQ table as file_name which gives the file name to which this record belongs to. I am reading files using TextIO.Read and doing ParDo transformation on it. Within DoFn is there a way to identify the file name to which the incoming string belongs to.
My code look like below:
PCollection<String> logs = pipeline.apply(TextIO.Read.named("ReadLines")
.from("gcs path").withCompressionType(TextIO.CompressionType.AUTO));
PCollection<TableRow> formattedResults = logs.apply(ParDo.named("Format").of(new DoFn<String, TableRow>() {}
Update 1:
I am now trying as below:
PCollection<String> fileNamesCollection // this is collection of file names
GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(options.as(GcsOptions.class));
PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Format").of(new DoFn<String, KV<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
ReadableByteChannel readChannel = channelFactory.open(c.element());
GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
BufferedReader br = new BufferedReader(new InputStreamReader(gzip));
String line = null;
while ((line = br.readLine()) != null) {
c.output(KV.of(c.element(), line));
}
}
}));
But when I run this program am getting that channelFactory is not serializable, i there any channel factory which is implementing Serializable interface and can be used here.
Update 2: I am finally able to execute program and successfully submit job. Thanks to jkff for assistance. Below is my final code, I am pasting it here so that it will helpful for others too.
ProcessLogFilesOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(ProcessLogFilesOptions.class); // ProcessLogFilesOptions is a custom class
DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
loggingOptions.setDefaultWorkerLogLevel(Level.WARN);
String jobName = "unique_job_name";
options.as(BlockingDataflowPipelineOptions.class).setJobName(jobName);
Pipeline pipeline = Pipeline.create(options);
List<String> filesToProcess = new ArrayList<String>();
for(String fileName : fileNameWithoutHrAndSuffix) { // fileNameWithoutHrAndSuffix has elements like Log_20160921,Log_20160922 etc
filesToProcess.addAll((new GcsIOChannelFactory(options.as(GcsOptions.class))).match(LogDestinationStoragePath+fileName));
}
// at this time filesToProcess will have all logs files name as Log_2016092101.gz,Log_2016092102.gz,.........,Log_2016092201.gz,Log_2016092223.gz
PCollection<String> fileNamesCollection = pipeline.apply(Create.of(filesToProcess));
PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Parsing_Files").of(new DoFn<String, KV<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
// I have to create _options here because Options, GcsIOChannelFactory are non serializable
ProcessLogFilesOptions _options = PipelineOptionsFactory.as(ProcessLogFilesOptions.class);
GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(_options.as(GcsOptions.class));
ReadableByteChannel readChannel = channelFactory.open(c.element());
GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
BufferedReader br = new BufferedReader(new InputStreamReader(gzip));
String line = null;
while ((line = br.readLine()) != null) {
c.output(KV.of(c.element(), line));
}
br.close();
gzip.close();
readChannel.close();
}
}));
// Performing reshuffling here as suggested
PCollection <KV<String,String>> withFileName = kv.apply(Reshuffle.<String, String>of());
PCollection<TableRow> formattedResults = withFileName
.apply(ParDo.named("Generating_TableRow").of(new DoFn<KV<String,String>, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
KV<String,String> kv = c.element();
String logLine = kv.getValue();
String logFileName = kv.getKey();
// do further processing as you want here
}));
// Finally insert in BQ table the formattedResults