2

I am in a bind here. I am trying to implement a very basic pipeline which reads data from kafka and process it in Spark. The problem I am facing is that apache spark shuts down abruptly giving the aforesaid error message. My pyspark version is 3.5.1 and scala version is 2.12.18.

The code in question is :-

from pyspark.sql import SparkSession
from pyspark.sql.functions import *


spark = SparkSession.builder \
    .appName('my_app') \
    .config("spark.jars", "/usr/local/spark/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar") \
    .getOrCreate()


df = spark.readStream \
    .format('kafka') \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "quickstart-events") \
    .option("startingOffsets", "earliest") \
    .load()


query = df.writeStream \
    .trigger(processingTime='5 seconds') \
    .outputMode("update") \
    .format("console") \
    .start()

query.awaitTermination()

I have downloaded all the necessary jar files and placed them in the appropriate directory in Spark. I am able to read messages from the kafka broker as a customer so the possibility of my kafka installation being out of order is ruled out. Any help will be tremendously appreciated.

8
  • 4
    Maybe you're mixing Scala 2.12.x and Scala 2.13.x dependencies Commented May 1, 2024 at 13:42
  • That error always means a mix of incompatible Scala versions. Commented May 1, 2024 at 15:35
  • 1
    @NanomachinesSon What is step by step reproduction of the behavior you observe? What exactly to install, what to run, how to run etc.? What instructions do yoh follow? I can see in your code that you're switching on a _2.12 jar but maybe there is some other _2.13 jar on the classpath. You should investigate your classpath. Commented May 2, 2024 at 6:58
  • 1
    @NanomachinesSon also whether scala-library is on classpath (scala.collection... method is from there). General reasons for NoSuchMethodError are stackoverflow.com/questions/35186/… stackoverflow.com/questions/27938776/… Commented May 2, 2024 at 7:15
  • 1
    @DmytroMitin As you have been quite helpful so far, I want to share this update with you. As answered by OneCricketeer in this post :-stackoverflow.com/questions/68105480/… I did the same thing by declaring PYTHON_SUB_ARGS within the code itself and suddenly, almost unexpectedly the stream started in notebook. But I have no luck in doing that from the terminal. My terminal command is as follows:- spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 --py-files ~/Desktop/JP/file.py ~/Desktop/JP/file.py Commented May 2, 2024 at 8:04

1 Answer 1

2

General reasons for NoSuchMethodError are

How do I fix a NoSuchMethodError?

Difference between NoSuchMethodException and NoSuchMethodError in Java

I suspect you're mixing _2.12 and _2.13 dependencies.

The class scala.collection.JavaConverters.AsJava exists in Scala 2.13.x scala-library and doesn't exist in Scala 2.12.x.

You should investigate your classpath. Modify your script

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName('my_app') \
    .getOrCreate()

print('Class path on start')
gateway = SparkContext._gateway
separator = gateway.jvm.java.lang.System.getProperty('path.separator')
for url in gateway.jvm.java.lang.System.getProperty('java.class.path').split(separator):
    print(url)

print()

# classLoader = gateway.jvm.java.lang.ClassLoader.getSystemClassLoader()
# classLoader = gateway.java_gateway_server.getClass().getClassLoader()
classLoader = gateway.jvm.java.lang.Thread.currentThread().getContextClassLoader()
while classLoader is not None:
    try:
        urls = classLoader.getURLs()
        print(f'Class loader {classLoader}:')
        for url in urls:
          print(url)
    except:
        print(f'Class loader {classLoader}: not URLClassLoader')
    print()
    classLoader = classLoader.getParent()

print("ok")

df = spark.readStream \
    .format('kafka') \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "quickstart-events") \
    .option("startingOffsets", "earliest") \
    .load()

query = df.writeStream \
    .trigger(processingTime='5 seconds') \
    .outputMode("update") \
    .format("console") \
    .start()

query.awaitTermination()

and run it

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 /path/to/file.py 

See the output (you can publish it here) whether there are not only _2.12 jars but also _2.13 ones there.

By the way, with spark-submit, .config("spark.jars", ... will not work in the code, correct is --packages ... from the command line:

Why does spark-submit ignore the package that I include as part of the configuration of my spark session?

Sign up to request clarification or add additional context in comments.

1 Comment

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.