I've a .gz
file available on a Web server that I want to consume in a streaming manner and insert the data into Couchbase. The .gz
file has only one file in it, which in turn contains one JSON object per line.
Since Spark doesn't have a HTTP receiver, I wrote one myself (shown below). I'm using Couchbase Spark connector to do the insertion. However, when running, the job is not actually inserting anything. I've a suspicion that it is due to my inexperience with Spark and not knowing how to start and await termination. As you can see below, there are 2 places such calls can be made.
Receiver:
public class HttpReceiver extends Receiver<String> {
private final String url;
public HttpReceiver(String url) {
super(MEMORY_AND_DISK());
this.url = url;
}
@Override
public void onStart() {
new Thread(() -> receive()).start();
}
private void receive() {
try {
HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
conn.setAllowUserInteraction(false);
conn.setInstanceFollowRedirects(true);
conn.setRequestMethod("GET");
conn.setReadTimeout(60 * 1000);
InputStream gzipStream = new GZIPInputStream(conn.getInputStream());
Reader decoder = new InputStreamReader(gzipStream, UTF_8);
BufferedReader reader = new BufferedReader(decoder);
String json = null;
while (!isStopped() && (json = reader.readLine()) != null) {
store(json);
}
reader.close();
conn.disconnect();
} catch (IOException e) {
stop(e.getMessage(), e);
}
}
@Override
public void onStop() {
}
}
Dataload:
public void load(String url) throws StreamingQueryException, InterruptedException {
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(url));
lines.foreachRDD(rdd ->
sql.read().json(rdd)
.select(new Column("id"),
new Column("name"),
new Column("rating"),
new Column("review_count"),
new Column("hours"),
new Column("attributes"))
.writeStream()
.option("idField", "id")
.format("com.couchbase.spark.sql")
.start()
// .awaitTermination(sparkProperties.getTerminationTimeoutMillis())
);
// ssc.start();
ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}
The commented lines show my confusion with starting and terminating the jobs. Also, feel free to comment regarding the receiver if there's something wrong with it or can be improved.
Using Spark v2.1.0 with Java.
Edit 1:
Also tried this implementation:
lines.foreachRDD(rdd ->
couchbaseWriter(sql.read().json(rdd)
.select(new Column("id"),
new Column("name"),
new Column("rating"),
new Column("review_count"),
new Column("hours"),
new Column("attributes"))
.write()
.option("idField", "id")
.format("com.couchbase.spark.sql"))
.couchbase()
);
ssc.start();
ssc.awaitTermination();
But it throws IllegalStateException: SparkContext has been shutdown
11004 [JobScheduler] ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming job 1488664987000 ms.0
java.lang.IllegalStateException: SparkContext has been shutdown
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1981)
at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1088)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1082)
at org.apache.spark.sql.execution.datasources.json.InferSchema$.infer(InferSchema.scala:69)
Edit 2:
Turns out the error from edit 1 was caused by a @PostDestruct
method where I was closing the context. I'm using Spring and the bean is supposed to be singleton, but somehow Spark is causing it to destroy before the job finishes. I've now removed the @PostDestruct
and made some changes; the following seems to be working but with open questions:
public void load(String dataDirURL, String format) throws StreamingQueryException, InterruptedException {
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));
lines.foreachRDD(rdd -> {
try {
Dataset<Row> select = sql.read().json(rdd)
.select("id", "name", "rating", "review_count", "hours", "attributes");
couchbaseWriter(select.write()
.option("idField", "id")
.format(format))
.couchbase();
} catch (Exception e) {
// Time to time throws AnalysisException: cannot resolve '`id`' given input columns: [];
}
});
ssc.start();
ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}
Open Questions:
- Time to time throws
AnalysisException: cannot resolve '
id' given input columns: [];
. Is this a problem with my receiver? When the document already exists, the task fails with the following exception. In my case, I'd simply like to overwrite the doc if present, not blow up.
Lost task 1.0 in stage 2.0 (TID 4, localhost, executor driver): com.couchbase.client.java.error.DocumentAlreadyExistsException at com.couchbase.client.java.CouchbaseAsyncBucket$13.call(CouchbaseAsyncBucket.java:475)