ETL & Parsing CSV files in Cloud Dataflow

2019-04-01 01:18发布

I'm new to cloud dataflow and Java so I'm hoping this is the right question to ask.

I have a csv file with n number of columns and rows that could be a string, integer or timestamp. Do I need to create a new PCollection for each column?

Most of the documentation that I've found in examples is along the lines of something like:

PCollection<String> data = p.apply(TextIO.Read.from("gs://abc/def.csv"));

But to me it doesn't make sense to import an entire csv file as a string. What am I missing here and how should I set my PCollections up?

2条回答
贼婆χ
2楼-- · 2019-04-01 02:06

This example will create a collection containing 1 String per line in the file, e.g. if the file is:

Alex,28,111-222-3344
Sam,30,555-666-7788
Drew,19,123-45-6789

then the collection will logically contain "Alex,28,111-222-3344", "Sam,30,555-666-7788", and "Drew,19,123-45-6789". You can apply further parsing code in Java by piping the collection through a ParDo or MapElements transform, e.g.:

class User {
    public String name;
    public int age;
    public String phone;
}

PCollection<String> lines = p.apply(TextIO.Read.from("gs://abc/def.csv"));
PCollection<User> users = lines.apply(MapElements.via((String line) -> {
    User user = new User();
    String[] parts = line.split(",");
    user.name = parts[0];
    user.age = Integer.parseInt(parts[1]);
    user.phone = parts[2];
    return user;
}).withOutputType(new TypeDescriptor<User>() {});)
查看更多
对你真心纯属浪费
3楼-- · 2019-04-01 02:13
line.split(",");

String.split doesn't make sense if the line data like this:

a,b,c,"we,have a string contains comma",d,e

A property way to deal with the csv data is to import a csv library:

        <dependency>
            <groupId>com.opencsv</groupId>
            <artifactId>opencsv</artifactId>
            <version>3.7</version>
        </dependency>

and use codes below inside ParDo:

public void processElement(ProcessContext c) throws IOException {
    String line = c.element();
    CSVParser csvParser = new CSVParser();
    String[] parts = csvParser.parseLine(line);
}
查看更多
登录 后发表回答