This is my code to read csv
//DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
PipelineOptions options=PipelineOptionsFactory.create();
//options.setProject("ProjectId");
//options.setStagingLocation("gs://bucketname/Object");
options.setRunner(DirectRunner.class);
options.setTempLocation("gs://bucketname/Object");
Pipeline p = Pipeline.create(options);
p.apply(FileIO.match().filepattern("gs://bucketname/objectname.csv")).apply(FileIO.readMatches())
.apply(ParDo.of(new checkSplitter()));
p.run();
}
static class checkSplitter extends DoFn<ReadableFile, String> {
/**
*
*/
private static final long serialVersionUID = 1L;
int rown = 1;
String line;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
try (InputStream is = Channels.newInputStream(c.element().open())) {
BufferedReader bReader = new BufferedReader(new InputStreamReader(is));
while ((line = bReader.readLine()) != null) {
System.out.println(line);
}
}
}
}
My pom.xml:
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.2.0</version>
<scope>runtime</scope>
</dependency>
<!-- slf4j API frontend binding with JUL backend -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
<version>v1-rev12-1.20.0</version>
<exclusions>
<exclusion>
<artifactId>guava-jdk5</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.googlecode.json-simple/json-simple -->
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.cloud/google-cloud-bigquery -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>0.30.0-beta</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-io -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>com.google.appengine.tools</groupId>
<artifactId>appengine-gcs-client</artifactId>
<version>0.6</version>
</dependency>
<!-- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.14</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.14</version>
</dependency> -->
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
The error is :
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.nio.file.InvalidPathException: Illegal char <:> at index 2: gs://bucketname/object.csv
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289)
at com.pearson.dataflow.StarterPipeline.main(StarterPipeline.java:107)
Caused by: java.nio.file.InvalidPathException: Illegal char <:> at index 2: gs://bucketname/object.csv
at sun.nio.fs.WindowsPathParser.normalize(WindowsPathParser.java:176)
at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:147)
at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:77)
at sun.nio.fs.WindowsPath.parse(WindowsPath.java:94)
at sun.nio.fs.WindowsFileSystem.getPath(WindowsFileSystem.java:255)
at java.nio.file.Paths.get(Paths.java:84)
at org.apache.beam.sdk.io.LocalFileSystem.matchOne(LocalFileSystem.java:219)
at org.apache.beam.sdk.io.LocalFileSystem.match(LocalFileSystem.java:89)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:125)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:147)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:159)
at org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn.process(FileIO.java:341)
I have attached the the code and pom.xml with the error so so many build in packages in 2.1 are not able to found in 2.2 please advice me or if there is any working example to run it in local please share me the link.
should i missed out any dependencies or any error in code please help me help will be appreciated
Thanks in Advance