I am building a gradle java project (please refer below) using Apache Beam code and executing on Eclipse Oxygen.
package com.xxxx.beam;
import java.io.IOException;
import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
public class ApacheBeamTestProject {
public void modelExecution(){
SparkContextOptions options = (SparkContextOptions) PipelineOptionsFactory.create();
options.setSparkMaster("xxxxxxxxx");
JavaSparkContext sc = options.getProvidedSparkContext();
JavaLinearRegressionWithSGDExample.runJavaLinearRegressionWithSGDExample(sc);
Pipeline p = Pipeline.create(options);
p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches())
.apply(MapElements
// uses imports from TypeDescriptors
.via(
new SimpleFunction <ReadableFile, KV<String,String>>() {
private static final long serialVersionUID = -5715607038612883677L;
@SuppressWarnings("unused")
public KV<String,String> createKV(ReadableFile f) {
String temp = null;
try{
temp = f.readFullyAsUTF8String();
}catch(IOException e){
}
return KV.of(f.getMetadata().resourceId().toString(), temp);
}
}
))
.apply(FileIO.write())
;
SparkPipelineResult result = (SparkPipelineResult) p.run();
result.getState();
}
public static void main(String[] args) throws IOException {
System.out.println("Test log");
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches())
.apply(MapElements
// uses imports from TypeDescriptors
.via(
new SimpleFunction <ReadableFile, KV<String,String>>() {
private static final long serialVersionUID = -5715607038612883677L;
@SuppressWarnings("unused")
public KV<String,String> createKV(ReadableFile f) {
String temp = null;
try{
temp = f.readFullyAsUTF8String();
}catch(IOException e){
}
return KV.of(f.getMetadata().resourceId().toString(), temp);
}
}
))
.apply(FileIO.write());
p.run();
}
}
I am observing the following error when executing this project in Eclipse.
Test log
Exception in thread "main" java.lang.IllegalArgumentException: No Runner was specified and the DirectRunner was not found on the classpath.
Specify a runner by either:
Explicitly specifying a runner by providing the 'runner' property
Adding the DirectRunner to the classpath
Calling 'PipelineOptions.setRunner(PipelineRunner)' directly
at org.apache.beam.sdk.options.PipelineOptions$DirectRunner.create(PipelineOptions.java:291)
at org.apache.beam.sdk.options.PipelineOptions$DirectRunner.create(PipelineOptions.java:281)
at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:591)
at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:532)
at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:155)
at org.apache.beam.sdk.options.PipelineOptionsValidator.validate(PipelineOptionsValidator.java:95)
at org.apache.beam.sdk.options.PipelineOptionsValidator.validate(PipelineOptionsValidator.java:49)
at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:44)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:150)
This project doesn't contain pom.xml file. Gradle has setup for all the links. I am not sure how to fix this error? Could someone advise?
It seems that you are trying to use the
DirectRunner
and it is not on the classpath of your application. You can supply it by adding beam-runners-direct-java dependency to your application:https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java
EDIT (answered in comment): you are trying to run this code on spark, but didn't specify it in PipelineOptions. Beam by default tries to run the code on DirectRunner, so I think this is why you get this error. Specifying:
options.setRunner(SparkRunner.class);
before creating the pipeline sets the correct runner and fixes the issue.