I have a CSV file, and I don't know the column names ahead of time. I need to output the data in JSON after some transformations in Google Dataflow.
What's the best way to take the header row and permeate the labels through all the rows?
For example:
a,b,c
1,2,3
4,5,6
...becomes (approximately):
{a:1, b:2, c:3}
{a:4, b:5, c:6}
You should implement custom FileBasedSource (similar to TextIO.TextSource), that will read the first line and store header data
@Override
protected void startReading(final ReadableByteChannel channel)
throws IOException {
lineReader = new LineReader(channel);
if (lineReader.readNextLine()) {
final String headerLine = lineReader.getCurrent().trim();
header = headerLine.split(",");
readingStarted = true;
}
}
and latter, while reading other lines prepend it to current line data:
@Override
protected boolean readNextRecord() throws IOException {
if (!lineReader.readNextLine()) {
return false;
}
final String line = lineReader.getCurrent();
final String[] data = line.split(",");
// assumes all lines are valid
final StringBuilder record = new StringBuilder();
for (int i = 0; i < header.length; i++) {
record.append(header[i]).append(":").append(data[i]).append(", ");
}
currentRecord = record.toString();
return true;
}
I've implemented a quick (complete) solution, available on github. I also added a dataflow unit test to demonstrate reading:
@Test
public void test_reading() throws Exception {
final File file =
new File(getClass().getResource("/sample.csv").toURI());
assertThat(file.exists()).isTrue();
final Pipeline pipeline = TestPipeline.create();
final PCollection<String> output =
pipeline.apply(Read.from(CsvWithHeaderFileSource.from(file.getAbsolutePath())));
DataflowAssert
.that(output)
.containsInAnyOrder("a:1, b:2, c:3, ", "a:4, b:5, c:6, ");
pipeline.run();
}
where sample.csv
has following content:
a,b,c
1,2,3
4,5,6
I have created a solution based on Luka's source code (see previous answer).
Luka's code in github is for dataflow-1.x, and implements a FileBasedSource which extracts the first line and caches it, then prepends it to every following line. This requires the entire file to be processed on a single node (not splittable).
My variant of the FileBasedSource instead just returns the first line of a file; as described in the class javadoc this line can then be split (as desired) and used as a side-input to the logic which processes the complete file (which can then be done in parallel). The code is compatible with Beam 2.x (tested on Beam 2.4.0).
See http://moi.vonos.net/cloud/beam-read-header/