0

I am running a spark-submit job on AWS EMR to read from an elasticsearch node.

when the job executes this command

python:

es_config = {
    "es.nodes": url_to_my_node,
    "es.port": "9200",
    "es.resource": "my_elasticsearch_index/_doc",
    "es.query": "?q=id:park_rocky-mountain",
    "es.read.metadata": "true",
    "es.nodes.wan.only": "true",
}

df = spark.read.format("org.elasticsearch.spark.sql") \
    .options(**es_config) \
    .load()

or in scala

val esReadOptions = Map(
    "es.nodes" -> "url_to_my_node",
    "es.port" -> "9200"
)

val df = spark.read.format("org.elasticsearch.spark.sql")
    .options(esReadOptions)
    .load("url_to_my_node")

I get the following error:

: java.lang.NoClassDefFoundError: scala/Product$class
        at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:221)
        at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:97)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
        at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

apparently this is due to either the JAR not being found or not having the right version.

I have checked the presence of the JAR in the spark context:

println(System.getProperty("java.class.path"))
> ...
> :/usr/lib/spark/jars/elasticsearch-hadoop-7.17.6.jar
> ...

as for version conflicts, the EMR instance offers the following environment:

EMR version 3.3.2-amzn-0 Using Scala version 2.12.15 OpenJDK 64-Bit Server VM, Java 1.8.0_382

and I am using this JAR: /usr/lib/spark/jars/elasticsearch-hadoop-7.17.6.jar which should be compatible with scala2.12 and spark 3.x

I have tried multiple different JARs:

  • elasticsearch-hadoop-8.9.0.jar
  • elasticsearch-hadoop-7.17.6.jar
  • opensearch-hadoop-1.0.1.jar

I have tried connecting to different instances of Elasticsearch:

  • Elasticsearch sandbox on (latest as of time or writing) version 8.9.0,
  • Opensearch instance (backward compatible with Elasticsearch 7.12) I have previously positively tested connection: I am able to access those 2 instances via a REST API call using curl or python.requests from the environment.

I have tried connecting

  • with python code that invokes pyspark
  • using scala directly from the spark-shell. I get the same JAVA error as specified above.

I have tried connecting

  • from an AWS EMR instance
  • from a local spark implementation running on linux mint 20.3 with kernel 5.15.0-78-generic

the class scala/Product has been part of Scala standard library since version 2.3, and here I am using Scala version 2.12. So the error message must be hiding another issue.

2
  • spark & scala version ? Commented Aug 17, 2023 at 3:14
  • Scala version 2.12.15, Java 1.8.0_382 . the full specs are above , search for 'environment'. thx a lot! Commented Aug 17, 2023 at 7:31

1 Answer 1

0

Finally the solution was to

  • include an additional JAR
  • find the right configuration to connect to AWS OpenSearch

we used the following:

AWS Opensearch "version number" : "7.10.2", "lucene_version" : "8.10.1"

AWS EMR with spark version 3.3.2-amzn-0 Scala version 2.12.15 Java 1.8.0_382

added the following JARs:

opensearch-hadoop-1.0.1.jar aws-java-sdk-1.12.531.jar

and used the following configuration:

op_config = {
    "opensearch.resource": index,
    "opensearch.nodes": elastic_host,
    "opensearch.port": elastic_port,
    "opensearch.net.ssl": "true",
    "opensearch.net.ssl.cert.allow.self.signed": "true",
    "opensearch.nodes.wan.only": "true",
    "opensearch.aws.sigv4.enabled": "true",
    "opensearch.aws.sigv4.region": aws_region,
    "opensearch.http.timeout": "3m",
}

df = spark.read.format("opensearch").options(**op_config).load()

unfortunately, the information on the github page
https://github.com/opensearch-project/opensearch-hadoop
only provides examples in Java and Scala, but porting it to pyspark is feasible, and it is the only valid documentation found out there as of time or writing.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.