Reading CSV header with Dataflow

2019-04-09 02:24发布

I have a CSV file, and I don't know the column names ahead of time. I need to output the data in JSON after some transformations in Google Dataflow.

What's the best way to take the header row and permeate the labels through all the rows?

For example:

a,b,c
1,2,3
4,5,6

...becomes (approximately):

{a:1, b:2, c:3}
{a:4, b:5, c:6}

2条回答
仙女界的扛把子
2楼-- · 2019-04-09 02:27

I have created a solution based on Luka's source code (see previous answer). Luka's code in github is for dataflow-1.x, and implements a FileBasedSource which extracts the first line and caches it, then prepends it to every following line. This requires the entire file to be processed on a single node (not splittable).

My variant of the FileBasedSource instead just returns the first line of a file; as described in the class javadoc this line can then be split (as desired) and used as a side-input to the logic which processes the complete file (which can then be done in parallel). The code is compatible with Beam 2.x (tested on Beam 2.4.0).

See http://moi.vonos.net/cloud/beam-read-header/

查看更多
我想做一个坏孩纸
3楼-- · 2019-04-09 02:29

You should implement custom FileBasedSource (similar to TextIO.TextSource), that will read the first line and store header data

    @Override
    protected void startReading(final ReadableByteChannel channel)
    throws IOException {
        lineReader = new LineReader(channel);

        if (lineReader.readNextLine()) {
            final String headerLine = lineReader.getCurrent().trim();
            header = headerLine.split(",");
            readingStarted = true;
        }
    }

and latter, while reading other lines prepend it to current line data:

    @Override
    protected boolean readNextRecord() throws IOException {
        if (!lineReader.readNextLine()) {
            return false;
        }

        final String line = lineReader.getCurrent();
        final String[] data = line.split(",");

        // assumes all lines are valid
        final StringBuilder record = new StringBuilder();
        for (int i = 0; i < header.length; i++) {
            record.append(header[i]).append(":").append(data[i]).append(", ");
        }

        currentRecord = record.toString();
        return true;
    }

I've implemented a quick (complete) solution, available on github. I also added a dataflow unit test to demonstrate reading:

@Test
public void test_reading() throws Exception {
    final File file =
            new File(getClass().getResource("/sample.csv").toURI());
    assertThat(file.exists()).isTrue();

    final Pipeline pipeline = TestPipeline.create();

    final PCollection<String> output =
            pipeline.apply(Read.from(CsvWithHeaderFileSource.from(file.getAbsolutePath())));

    DataflowAssert
            .that(output)
            .containsInAnyOrder("a:1, b:2, c:3, ", "a:4, b:5, c:6, ");

    pipeline.run();
}

where sample.csv has following content:

a,b,c
1,2,3
4,5,6
查看更多
登录 后发表回答