1

I'm trying to read stream from Kafka using pyspark.

The Stack I'm working with:

  • Kubernetes.
  • Stand alone spark cluster with 2 workers.
  • spark-connect connected to the cluster and has the dependencies related to running spark-connect and kafka "org.apache.spark:spark-connect_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1"
  • Kafka cluster by strimzi operator on kubernetes.

And here is the error I'm getting:

SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 11) (192.168.218.135 executor 0): java.lang.ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaSourceRDDPartition
    at org.apache.spark.executor.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:124)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:467)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2034)
    at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1898)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2224)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

Here is my code:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test2").remote("sc://spark-master-svc.data-lab-system:15002").config("spark.executor.memory", "512m").getOrCreate()

kafka_service = "kafka-kafka-bootstrap.data-lab-system:9092"

topic_name = "pre-proccessing_posts_stream"
group_id = "spark-consumer-group3"

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_service) \
    .option("subscribe", topic_name) \
    .option("kafka.group.id", group_id) \
    .load()
    # .option("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \

df.printSchema() \\ working and I got output from it

df.show() \\ here I got the error

This error only occurs when spark-connect connected to spark-cluster. If it work as single node everything goes well.

1
  • can you check your spark.serializer setting once? It you are using the KryoSerializer, can you try removing that setting once? Commented Jul 8, 2024 at 10:20

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.