0

I am trying to execute the Elasticsearch DSL query in Spark 2.2 and Scala 2.11.8. The version of Elasticsearch is 2.4.4, while the version of Kibana that I use is 4.6.4.

This is the library that I use in Spark:

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>5.2.2</version>
</dependency>

This is my code:

val spark = SparkSession.builder()
      .config("es.nodes","localhost")
      .config("es.port",9200)
      .config("es.nodes.wan.only","true")
      .config("es.index.auto.create","true")
      .appName("ES test")
      .master("local[*]")
      .getOrCreate()

    val myquery = """{"query":
                          {"bool": {
                             "must": [
                                {
                                   "has_child": {
                                       "filter": {
                                          ...
                                       }
                                    }
                                }
                             ]
                          }
                      }}"""

  
   val df = spark.read.format("org.elasticsearch.spark.sql")
      .option("query", myquery)
      .option("pushdown", "true")
      .load("myindex/items")

I provided the main corpus of the DSL query. I am getting the error:

java.lang.IllegalArgumentException: Failed to parse query: {"query":

Initially, I was thinking that the problem consists in the version of Elasticsearch. As far as I understand from this GitHub, the version 4 of Elasticsearch is not supported.

However, if I run the same code with a simple query, it correctly retrieves records from Elasticsearch.

var df = spark.read
              .format("org.elasticsearch.spark.sql")
              .option("es.query", "?q=public:*")
              .load("myindex/items")

Therefore I assume that the issue is not related to the version, but it's rather related to the way how I represent the query.

This query works fine with cURL, but maybe it should be somehow updated before passing it to Spark?

Full error stacktrace:

    Previous exception in task: Failed to parse query: {"query":
                         {"bool": {
                           "must": [
                             {
                               "has_child": {
                                 "filter": {
                                   "bool": {
                                     "must": [
                                       {
                                         "term": {
                                           "project": 579
                                         }
                                       },
                                       {
                                         "terms": {
                                           "status": [
                                             0,
                                             1,
                                             2
                                           ]
                                         }
                                       }
                                     ]
                                   }
                                 },
                                 "type": "status"
                               }
                             },
                             {
                               "has_child": {
                                 "filter": {
                                   "bool": {
                                     "must": [
                                       {
                                         "term": {
                                           "project": 579
                                         }
                                       },
                                       {
                                         "terms": {
                                           "entity": [
                                             4634
                                           ]
                                         }
                                       }
                                     ]
                                   }
                                 },
                                 "type": "annotation"
                               }
                             },
                             {
                               "term": {
                                 "project": 579
                               }
                             },
                             {
                               "range": {
                                 "publication_date": {
                                   "gte": "2017/01/01",
                                   "lte": "2017/04/01",
                                   "format": "yyyy/MM/dd"
                                 }
                               }
                             },
                             {
                               "bool": {
                                 "should": [
                                   {
                                     "terms": {
                                       "typology": [
                                         "news",
                                         "blog",
                                         "forum",
                                         "socialnetwork"
                                       ]
                                     }
                                   },
                                   {
                                     "terms": {
                                       "publishing_platform": [
                                         "twitter"
                                       ]
                                     }
                                   }
                                 ]
                               }
                             }
                           ]
                         }}
    org.elasticsearch.hadoop.rest.query.QueryUtils.parseQuery(QueryUtils.java:59)
    org.elasticsearch.hadoop.rest.RestService.createReader(RestService.java:417)
    org.elasticsearch.spark.rdd.AbstractEsRDDIterator.reader$lzycompute(AbstractEsRDDIterator.scala:49)
    org.elasticsearch.spark.rdd.AbstractEsRDDIterator.reader(AbstractEsRDDIterator.scala:42)
    org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:61)
    scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    org.apache.spark.scheduler.Task.run(Task.scala:108)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    java.lang.Thread.run(Thread.java:745)
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:118)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
18/01/22 21:43:12 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: Failed to parse query: {"query":

and this one:

8/01/22 21:47:40 WARN ScalaRowValueReader: Field 'cluster' is backed by an array but the associated Spark Schema does not reflect this;
              (use es.read.field.as.array.include/exclude) 
18/01/22 21:47:40 WARN ScalaRowValueReader: Field 'project' is backed by an array but the associated Spark Schema does not reflect this;
              (use es.read.field.as.array.include/exclude) 
18/01/22 21:47:40 WARN ScalaRowValueReader: Field 'client' is backed by an array but the associated Spark Schema does not reflect this;
              (use es.read.field.as.array.include/exclude) 

18/01/22 21:47:40 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
scala.MatchError: Buffer(13473953) (of class scala.collection.convert.Wrappers$JListWrapper)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:276)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:275)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
    at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:61)
    at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:58)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
18/01/22 21:47:40 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): scala.MatchError: Buffer(13473953) (of class scala.collection.convert.Wrappers$JListWrapper)
2
  • Markus, could you provide full exception details including stack trace? I think the important question is whether this query is reject by the ES node itself or by the client-side wrapper. Commented Jan 22, 2018 at 20:25
  • @SergGr: I posted the whole error stacktrace. I think that the key is Caused by: org.codehaus.jackson.JsonParseException: Unexpected character (':' (code 58)): was expecting comma to separate OBJECT entries. But how can I solve it? Commented Jan 22, 2018 at 20:36

1 Answer 1

1

So the error says

Caused by: org.codehaus.jackson.JsonParseException: Unexpected character (':' (code 58)):
 at [Source: java.io.StringReader@76aeea7a; line: 2, column: 33]

and if you look at your query

val myquery = """{"query":
                      "bool": {

you'll see that this probably maps to the : just after "bool" and obviously what you have is an invalid JSON. To make it clear I just re-format it as

{"query": "bool": { ...

Most probably you forgot { after "query" and the matching } at the end. Compare this to an example at official docs.

{
  "query": {
    "bool" : {
      "must" : {
Sign up to request clarification or add additional context in comments.

5 Comments

I added { and }, but anyway I get the error. Now it's different. Please see the error.
Thanks. I think that I am now closer to the solution. I get now warnings related to the schema: 18/01/22 21:47:40 WARN ScalaRowValueReader: Field 'cluster' is backed by an array but the associated Spark Schema does not reflect this; (use es.read.field.as.array.include/exclude)
I don't think that changing a question to a totally different (and most probably unrelated) error after the previous one was solved is a fair deal. If you have new error, you should create a new question. And also you should investigate it first yourself. And by the way it is really hard to answer a question when the only time 'cluster` is mentioned in the question is in the error message.
It is related to this issue https://github.com/elastic/elasticsearch-hadoop/issues/1058. I would like to post more details of the error, but the connector does not return more details.
@Markus, sorry this is a totally different error and you really should create a new question for it. I'm going to stop answering you here because the original problem has been solved (although you didn't bother to accept it).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.