I'm new to cloud dataflow and Java so I'm hoping this is the right question to ask.
I have a csv file with n number of columns and rows that could be a string, integer or timestamp. Do I need to create a new PCollection for each column?
Most of the documentation that I've found in examples is along the lines of something like:
PCollection<String> data = p.apply(TextIO.Read.from("gs://abc/def.csv"));
But to me it doesn't make sense to import an entire csv file as a string. What am I missing here and how should I set my PCollections up?
This example will create a collection containing 1 String
per line in the file, e.g. if the file is:
Alex,28,111-222-3344
Sam,30,555-666-7788
Drew,19,123-45-6789
then the collection will logically contain "Alex,28,111-222-3344"
, "Sam,30,555-666-7788"
, and "Drew,19,123-45-6789"
. You can apply further parsing code in Java by piping the collection through a ParDo
or MapElements
transform, e.g.:
class User {
public String name;
public int age;
public String phone;
}
PCollection<String> lines = p.apply(TextIO.Read.from("gs://abc/def.csv"));
PCollection<User> users = lines.apply(MapElements.via((String line) -> {
User user = new User();
String[] parts = line.split(",");
user.name = parts[0];
user.age = Integer.parseInt(parts[1]);
user.phone = parts[2];
return user;
}).withOutputType(new TypeDescriptor<User>() {});)
line.split(",");
String.split doesn't make sense if the line data like this:
a,b,c,"we,have a string contains comma",d,e
A property way to deal with the csv data is to import a csv library:
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>3.7</version>
</dependency>
and use codes below inside ParDo:
public void processElement(ProcessContext c) throws IOException {
String line = c.element();
CSVParser csvParser = new CSVParser();
String[] parts = csvParser.parseLine(line);
}