-->

Spark Java: How to move data from HTTP source to C

2020-07-24 03:48发布

问题:

I've a .gz file available on a Web server that I want to consume in a streaming manner and insert the data into Couchbase. The .gz file has only one file in it, which in turn contains one JSON object per line.

Since Spark doesn't have a HTTP receiver, I wrote one myself (shown below). I'm using Couchbase Spark connector to do the insertion. However, when running, the job is not actually inserting anything. I've a suspicion that it is due to my inexperience with Spark and not knowing how to start and await termination. As you can see below, there are 2 places such calls can be made.

Receiver:

public class HttpReceiver extends Receiver<String> {
    private final String url;

    public HttpReceiver(String url) {
        super(MEMORY_AND_DISK());
        this.url = url;
    }

    @Override
    public void onStart() {
        new Thread(() -> receive()).start();
    }

    private void receive() {
        try {
            HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
            conn.setAllowUserInteraction(false);
            conn.setInstanceFollowRedirects(true);
            conn.setRequestMethod("GET");
            conn.setReadTimeout(60 * 1000);

            InputStream gzipStream = new GZIPInputStream(conn.getInputStream());
            Reader decoder = new InputStreamReader(gzipStream, UTF_8);
            BufferedReader reader = new BufferedReader(decoder);

            String json = null;
            while (!isStopped() && (json = reader.readLine()) != null) {
                store(json);
            }
            reader.close();
            conn.disconnect();
        } catch (IOException e) {
            stop(e.getMessage(), e);
        }
    }

    @Override
    public void onStop() {

    }
}

Dataload:

public void load(String url) throws StreamingQueryException, InterruptedException {
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
        JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(url));

        lines.foreachRDD(rdd ->
                sql.read().json(rdd)
                        .select(new Column("id"),
                                new Column("name"),
                                new Column("rating"),
                                new Column("review_count"),
                                new Column("hours"),
                                new Column("attributes"))
                        .writeStream()
                        .option("idField", "id")
                        .format("com.couchbase.spark.sql")
                        .start()
//                        .awaitTermination(sparkProperties.getTerminationTimeoutMillis())
        );

//        ssc.start();
        ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}

The commented lines show my confusion with starting and terminating the jobs. Also, feel free to comment regarding the receiver if there's something wrong with it or can be improved.

Using Spark v2.1.0 with Java.

Edit 1:

Also tried this implementation:

lines.foreachRDD(rdd ->
          couchbaseWriter(sql.read().json(rdd)
                  .select(new Column("id"),
                          new Column("name"),
                          new Column("rating"),
                          new Column("review_count"),
                          new Column("hours"),
                          new Column("attributes"))
                  .write()
                  .option("idField", "id")
                  .format("com.couchbase.spark.sql"))
                  .couchbase()
  );

  ssc.start();
  ssc.awaitTermination();

But it throws IllegalStateException: SparkContext has been shutdown

11004 [JobScheduler] ERROR org.apache.spark.streaming.scheduler.JobScheduler  - Error running job streaming job 1488664987000 ms.0
java.lang.IllegalStateException: SparkContext has been shutdown
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1981)
    at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1088)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.fold(RDD.scala:1082)
    at org.apache.spark.sql.execution.datasources.json.InferSchema$.infer(InferSchema.scala:69)

Edit 2: Turns out the error from edit 1 was caused by a @PostDestruct method where I was closing the context. I'm using Spring and the bean is supposed to be singleton, but somehow Spark is causing it to destroy before the job finishes. I've now removed the @PostDestruct and made some changes; the following seems to be working but with open questions:

public void load(String dataDirURL, String format) throws StreamingQueryException, InterruptedException {
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
    JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));

    lines.foreachRDD(rdd -> {
        try {
            Dataset<Row> select = sql.read().json(rdd)
                    .select("id", "name", "rating", "review_count", "hours", "attributes");
            couchbaseWriter(select.write()
                    .option("idField", "id")
                    .format(format))
                    .couchbase();
        } catch (Exception e) {
            // Time to time throws AnalysisException: cannot resolve '`id`' given input columns: [];
        }
    });

    ssc.start();
    ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}

Open Questions:

  1. Time to time throws AnalysisException: cannot resolve 'id' given input columns: [];. Is this a problem with my receiver?
  2. When the document already exists, the task fails with the following exception. In my case, I'd simply like to overwrite the doc if present, not blow up.

    Lost task 1.0 in stage 2.0 (TID 4, localhost, executor driver): com.couchbase.client.java.error.DocumentAlreadyExistsException
    at com.couchbase.client.java.CouchbaseAsyncBucket$13.call(CouchbaseAsyncBucket.java:475)
    

回答1:

Answering my own question, this is what I finally have working without any exceptions:

public void load(String dataDirURL, String format) throws InterruptedException {
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
    JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));

    ObjectMapper objectMapper = new ObjectMapper();

    lines.foreachRDD(rdd -> {
                JavaRDD<RawJsonDocument> docRdd = rdd
                        .filter(content -> !isEmpty(content))
                        .map(content -> {
                            String id = "";
                            String modifiedContent = "";
                            try {
                                ObjectNode node = objectMapper.readValue(content, ObjectNode.class);
                                if (node.has("id")) {
                                    id = node.get("id").textValue();
                                    modifiedContent = objectMapper.writeValueAsString(node.retain(ALLOWED_FIELDS));
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            } finally {
                                return RawJsonDocument.create(id, modifiedContent);
                            }
                        })
                        .filter(doc -> !isEmpty(doc.id()));
                couchbaseDocumentRDD(docRdd)
                        .saveToCouchbase(UPSERT);
            }
    );

    ssc.start();
    ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}