1

I'm trying to write a stream to Elasticsearch using PySpark. I have two dataframes that I read from Kafka and join into df_joined. Printing df_joined to the terminal shows the correct columns and values. Once I try to write it to Elasticsearch (on localhost:9200) using this code:

spark_session = SparkSession.builder.appName("spark-test").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.elasticsearch:elasticsearch-hadoop:8.5.3").getOrCreate()
df1 = ... 
df2 = ... # For df1 and df2 I do .select(from_json(col('value), schema)) 
df_joined = df1.join(df2, df1.fk == d2.pk)
query = df_joined \
    .writeStream \
    .outputMode("append") \
    .format("org.elasticsearch.spark.sql") \
    .option("es.resource", "name_of_index/name_of_type") \
    .option("es.mapping.id", "id") \
    .option("es.spark.sql.streaming.sink.log.enabled", False) \
    .option("checkpointLocation", "/tmp/es_checkpoint") \
    .start()
query.awaitTermination()

I use:

  • elastic-search-hadoop version 8.5.3
  • PySpark version 3.3.1

I get the following error/stack trace:

22/12/28 00:41:48 ERROR MicroBatchExecution: Query [id = 3b9b239f-bd38-43e0-820a-6d8fdac56e79, runId = e4869668-2e3c-4e8e-a6be-6c111a832812] terminated with error
java.lang.NoSuchMethodError: org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(Lorg/apache/spark/sql/SparkSession;Lorg/apache/spark/sql/execution/QueryExecution;Lscala/Function0;)Ljava/lang/Object;
    at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.addBatch(EsSparkSqlStreamingSink.scala:62)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:666)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Exception in thread "stream execution thread for [id = 3b9b239f-bd38-43e0-820a-6d8fdac56e79, runId = e4869668-2e3c-4e8e-a6be-6c111a832812]" java.lang.NoSuchMethodError: org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(Lorg/apache/spark/sql/SparkSession;Lorg/apache/spark/sql/execution/QueryExecution;Lscala/Function0;)Ljava/lang/Object;
    at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.addBatch(EsSparkSqlStreamingSink.scala:62)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:666)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)

What I have tried

  • I have tried downgrading PySpark to version 3.2.0 (based on an accepted SO answer)
  • When creating the SparkSession I have tried to change org.elasticsearch:elasticsearch-hadoop:8.5.3 to org.elasticsearch:elasticsearch-spark-20_2.11:8.5.3

edit:

  • Koedlt's answer was very helpful to understand the problem better but unfortunately downgrading the version to 2.3.0 did not solve the problem as the integration with Kafka failed.

1 Answer 1

2

If you look at the most interesting piece of your error, you see the following:

java.lang.NoSuchMethodError: org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(Lorg/apache/spark/sql/SparkSession;Lorg/apache/spark/sql/execution/QueryExecution;Lscala/Function0;)

So it's not finding a method of SQLExecution called withNewExecutionId that has a SparkSession, a QueryExecution and a Function as input. Let's have a look at that method in Spark's source code.

In version 3.3.1, we see the following function signature:

def withNewExecutionId[T](
      queryExecution: QueryExecution,
      name: Option[String] = None)(body: => T): T

This explains why you're getting a NoSuchMethodError: there is no method with the function signature you're expecting!

Now, let's look at the dependencies of your artifact in Maven Repository. For Spark, we're seeing this:

enter image description here

with the left column being the dependency version and the right column the newest stable update to that dependency. Let's have a look at the dependency version, 2.3.0.

In version 2.3.0, we see the following function signature:

def withNewExecutionId[T](
      sparkSession: SparkSession,
      queryExecution: QueryExecution)(body: => T): T

That looks like what we're expecting! A SparkSession, a QueryExecution and a Function as an input.

Solution: It seems like this artifact requires you to use Spark 2.3.0 and is not compatible with newer versions the way you're using it right now. I don't know this artifact per se, so maybe there are ways around this. But try it out with version 2.3.0 and see what happens :)

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.