2

I've a .gz file available on a Web server that I want to consume in a streaming manner and insert the data into Couchbase. The .gz file has only one file in it, which in turn contains one JSON object per line.

Since Spark doesn't have a HTTP receiver, I wrote one myself (shown below). I'm using Couchbase Spark connector to do the insertion. However, when running, the job is not actually inserting anything. I've a suspicion that it is due to my inexperience with Spark and not knowing how to start and await termination. As you can see below, there are 2 places such calls can be made.

Receiver:

public class HttpReceiver extends Receiver<String> {
    private final String url;

    public HttpReceiver(String url) {
        super(MEMORY_AND_DISK());
        this.url = url;
    }

    @Override
    public void onStart() {
        new Thread(() -> receive()).start();
    }

    private void receive() {
        try {
            HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
            conn.setAllowUserInteraction(false);
            conn.setInstanceFollowRedirects(true);
            conn.setRequestMethod("GET");
            conn.setReadTimeout(60 * 1000);

            InputStream gzipStream = new GZIPInputStream(conn.getInputStream());
            Reader decoder = new InputStreamReader(gzipStream, UTF_8);
            BufferedReader reader = new BufferedReader(decoder);

            String json = null;
            while (!isStopped() && (json = reader.readLine()) != null) {
                store(json);
            }
            reader.close();
            conn.disconnect();
        } catch (IOException e) {
            stop(e.getMessage(), e);
        }
    }

    @Override
    public void onStop() {

    }
}

Dataload:

public void load(String url) throws StreamingQueryException, InterruptedException {
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
        JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(url));

        lines.foreachRDD(rdd ->
                sql.read().json(rdd)
                        .select(new Column("id"),
                                new Column("name"),
                                new Column("rating"),
                                new Column("review_count"),
                                new Column("hours"),
                                new Column("attributes"))
                        .writeStream()
                        .option("idField", "id")
                        .format("com.couchbase.spark.sql")
                        .start()
//                        .awaitTermination(sparkProperties.getTerminationTimeoutMillis())
        );

//        ssc.start();
        ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}

The commented lines show my confusion with starting and terminating the jobs. Also, feel free to comment regarding the receiver if there's something wrong with it or can be improved.

Using Spark v2.1.0 with Java.

Edit 1:

Also tried this implementation:

lines.foreachRDD(rdd ->
          couchbaseWriter(sql.read().json(rdd)
                  .select(new Column("id"),
                          new Column("name"),
                          new Column("rating"),
                          new Column("review_count"),
                          new Column("hours"),
                          new Column("attributes"))
                  .write()
                  .option("idField", "id")
                  .format("com.couchbase.spark.sql"))
                  .couchbase()
  );

  ssc.start();
  ssc.awaitTermination();

But it throws IllegalStateException: SparkContext has been shutdown

11004 [JobScheduler] ERROR org.apache.spark.streaming.scheduler.JobScheduler  - Error running job streaming job 1488664987000 ms.0
java.lang.IllegalStateException: SparkContext has been shutdown
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1981)
    at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1088)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.fold(RDD.scala:1082)
    at org.apache.spark.sql.execution.datasources.json.InferSchema$.infer(InferSchema.scala:69)

Edit 2: Turns out the error from edit 1 was caused by a @PostDestruct method where I was closing the context. I'm using Spring and the bean is supposed to be singleton, but somehow Spark is causing it to destroy before the job finishes. I've now removed the @PostDestruct and made some changes; the following seems to be working but with open questions:

public void load(String dataDirURL, String format) throws StreamingQueryException, InterruptedException {
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
    JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));

    lines.foreachRDD(rdd -> {
        try {
            Dataset<Row> select = sql.read().json(rdd)
                    .select("id", "name", "rating", "review_count", "hours", "attributes");
            couchbaseWriter(select.write()
                    .option("idField", "id")
                    .format(format))
                    .couchbase();
        } catch (Exception e) {
            // Time to time throws AnalysisException: cannot resolve '`id`' given input columns: [];
        }
    });

    ssc.start();
    ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}

Open Questions:

  1. Time to time throws AnalysisException: cannot resolve 'id' given input columns: [];. Is this a problem with my receiver?
  2. When the document already exists, the task fails with the following exception. In my case, I'd simply like to overwrite the doc if present, not blow up.

    Lost task 1.0 in stage 2.0 (TID 4, localhost, executor driver): com.couchbase.client.java.error.DocumentAlreadyExistsException
    at com.couchbase.client.java.CouchbaseAsyncBucket$13.call(CouchbaseAsyncBucket.java:475)
    
6
  • writeStream looks wrong. You don't use structured streaming you use standard write followed by save like sql.read().json(rdd).select(...).write().option(...).format(...).save() Commented Mar 4, 2017 at 21:25
  • @zero323, Nop, see edit 1. Also tried without couchbaseWriter and using save, same exception. Based on src code, couchbaseWriter seems to be internally calling save. Commented Mar 4, 2017 at 22:03
  • This actually looks like an exception during schema inference, not writer. Commented Mar 4, 2017 at 23:08
  • @zero323 See edit 2. Commented Mar 4, 2017 at 23:19
  • This is most likely a problem with your data. I would test this in a batch mode first if I were you (take a single batch, dump rdd to text file and see how start from there). Commented Mar 4, 2017 at 23:31

1 Answer 1

1

Answering my own question, this is what I finally have working without any exceptions:

public void load(String dataDirURL, String format) throws InterruptedException {
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
    JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));

    ObjectMapper objectMapper = new ObjectMapper();

    lines.foreachRDD(rdd -> {
                JavaRDD<RawJsonDocument> docRdd = rdd
                        .filter(content -> !isEmpty(content))
                        .map(content -> {
                            String id = "";
                            String modifiedContent = "";
                            try {
                                ObjectNode node = objectMapper.readValue(content, ObjectNode.class);
                                if (node.has("id")) {
                                    id = node.get("id").textValue();
                                    modifiedContent = objectMapper.writeValueAsString(node.retain(ALLOWED_FIELDS));
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            } finally {
                                return RawJsonDocument.create(id, modifiedContent);
                            }
                        })
                        .filter(doc -> !isEmpty(doc.id()));
                couchbaseDocumentRDD(docRdd)
                        .saveToCouchbase(UPSERT);
            }
    );

    ssc.start();
    ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.