4

i'm trying to run a StructuredStreaming job on GCP DataProc, which reads from Kafka nd prints out the values. The code is giving error -> java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer

Here is the code:

import sys, datetime, time, os
from pyspark.sql.functions import col, rank, dense_rank, to_date, to_timestamp, format_number, row_number, lead, lag,monotonically_increasing_id
from pyspark.sql import SparkSession, Window
from confluent_kafka import Producer
from google.cloud import storage

spark = SparkSession.builder.appName('StructuredStreaming_VersaSase').getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

kafkaBrokers='34.75.148.41:9094'
topic = "versa-sase"
# bootstrap.servers=my-cluster-lb-ssl-cert-kafka-bootstrap:9093
security_protocol="SSL"


client = storage.Client()
print(" client ", client)

bucket = client.get_bucket('ssl-certs-karan')
print(" bucket ", bucket)

allblobs = bucket.list_blobs()
print(" allblobs -> ", allblobs)

for b in allblobs:
    print(" b -> ", b)

blob_ssl_truststore_location = bucket.get_blob('ca.p12')
print(" blob_ssl_truststore_location.name ", blob_ssl_truststore_location.name)
blob_ssl_truststore_location.download_to_filename(blob_ssl_truststore_location.name)

ssl_truststore_location=blob_ssl_truststore_location.name
print(" type - blob_ssl_truststore_location ", type(blob_ssl_truststore_location))
ssl_truststore_password="NAvqbh5c9fB4"

blob_ssl_keystore_location = bucket.get_blob('dataproc-versa-sase.p12')
print(" blob_ssl_keystore_location.name ", blob_ssl_keystore_location.name)
blob_ssl_keystore_location.download_to_filename(blob_ssl_keystore_location.name)
ssl_keystore_location=blob_ssl_keystore_location.name
ssl_keystore_password="jBGsWrBv7258"
consumerGroupId = "versa-sase-grp"
checkpoint = "gs://ss-checkpoint/"

print(" SPARK.SPARKCONTEXT -> ", spark.sparkContext)



df = spark.read.format('kafka')\
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("kafka.security.protocol","SSL") \
    .option("kafka.ssl.truststore.location",ssl_truststore_location) \
    .option("kafka.ssl.truststore.password",ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location)\
    .option("kafka.ssl.keystore.password", ssl_keystore_password)\
    .option("subscribe", topic) \
    .option("kafka.group.id", consumerGroupId)\
    .option("startingOffsets", "earliest") \
    .load()
#


query = df.selectExpr("CAST(value AS STRING)") \
    .write \
    .format("console") \
    .option("numRows",100)\
    .option("checkpointLocation", checkpoint) \
    .option("outputMode", "complete")\
    .save("output")

# query.awaitTermination()

Command to launch the job on Dataproc cluster :

gcloud dataproc jobs submit pyspark \
StructuredStreaming_Kafka_GCP-Batch-feb1.py --cluster=dataproc-ss-poc 
--jars=gs://spark-jars-karan/spark-sql-kafka-0-10_2.12-3.1.2.jar     
--region=us-central1

Error :

 SPARK.SPARKCONTEXT ->  <SparkContext master=yarn appName=StructuredStreaming_VersaSase>
Traceback (most recent call last):
  File "/tmp/b87ff69307344e2db5b43f4a73c377cf/StructuredStreaming_Kafka_GCP-Batch-feb1.py", line 49, in <module>
    df = spark.read.format('kafka')\
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 210, in load
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o69.load.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:556)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateBatchOptions(KafkaSourceProvider.scala:336)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:127)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)


I've checked the Spark version on the Dataproc cluster, and spark version - 3.1.2,and scala version - 2.12 .. so the version of the spark-sql jar being passed seems to be correct. Are there any other jars to be passed ?

What needs to be done to fix/debug this issue ?

tia !

3
  • Does this answer your question? Spark fails with NoClassDefFoundError for org.apache.kafka.common.serialization.StringDeserializer Commented Feb 2, 2022 at 6:55
  • @MartinZeitler - from what i understand .. when i do -> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2, it will pull all the dependant jars and that work fine however .. what do i need to do on Dataproc to make it work ? Do i need to pass individual jars in which case - i'll need to figure out the jars to be added .. or can i pass package name ? Commented Feb 2, 2022 at 7:38
  • What error do you get when you use --packages with a maven target instead of --jars with a file? Otherwise, you need to at least get kafka-clients.jar and all other possible dependencies Commented Feb 2, 2022 at 15:12

3 Answers 3

4

The missing class org/apache/kafka/common/serialization/ByteArraySerializer is in the kafka-clients package 1 which is a dependency of the spark-sql-kafka-0-10_2.12 package 2.

You can use either --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 which automatically pulls the transitive dependencies, or --jars=gs://my-bucket/spark-sql-kafka-0-10_2.12-3.1.2.jar,gs://my-bucket/kafka-clients-0.10.2.2.jar to add all the dependencies.

Sign up to request clarification or add additional context in comments.

Comments

3

i was able to resolve this issue, by passing the package as below i.e. --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 Pls note : i also added the individual jars initially to resolve the issue, however that clearly is not the right way

gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2.py  --cluster dataproc-ss-poc  --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --region us-central1

Comments

2

Please have a look at the official deployment guideline here: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying

Extracting the important part:

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 ...

All in all please use "--packages" instead of "--jar" because it takes care transitive dependencies.

3 Comments

OP is using dataproc, not spark-submit
OK, the main message is that packages handle transitive deps but jars not.
Sure. My point was that maybe --packages is not a valid argument to dataproc jobs submit command

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.