5

I'm using

  • Spark version: 3.0.0-preview2
  • Scala version: 2.12
  • JAVA version: 1.8
  • Kafka Broker version: 2.2.0

I have configured two JARS(spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar and kafka-clients-2.2.0.jar) at spark-defaults.conf file and saved the JARS in $SPARK_HOME/jars folder as well. when I tried to view the Key,Value of the data from Kafka servers(since the data from Kafka comes in K-V pairs in JSON format), I was facing the below error

java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$3(MicroBatchExecution.scala:102)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:81)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(S
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
        ... 30 more
Exception in thread "stream execution thread for [id = 504665ad-c59a-4a85-8c46-4d6c741b0adf, runId = 36bc5028-6b34-4d6c-a265-4c38ce66cfcbError: org/apache/spark/kafka010/KafkaConfigUpdater
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$3(MicroBatchExecution.scala:102)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:81)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(S
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
        ... 30 more

And here is the code I tried to view the Key-Value pair of the Kafka data

from pyspark import *
from pyspark.sql import *
from pyspark.sql.utils import *
from pyspark.streaming import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

conf = SparkConf().setMaster("local")
sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)

spark = SparkSession \
    .builder \
    .getOrCreate()

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092") \
  .option("subscribe", "topic1,topic2,topic3") \
  .option("failOnDataLoss", "false") \
  .load()

table = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

query = table \
        .writeStream \
        .outputMode("append") \
        .option("truncate","false") \
        .format("console") \
        .start() \
        .awaitTermination()

Could anyone help me on resolving this error? Thanks in advance!

3 Answers 3

10

your need add this jars in the $SPARK_HOME/JARS

  1. commons-pool2-2.8.0.jar
  2. kafka-clients-2.5.0.jar
  3. spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar
  4. spark-token-provider-kafka-0-10_2.12-3.0.0-preview2.jar
Sign up to request clarification or add additional context in comments.

Comments

3

This is how I can config to run PySpark (verison with scala 2.12 Spark 3.2.1) Structure Streaming with Kafka on jupyter lab

First,I download 5 jars files and I put them in the folder /jars under my current project folder (just for local run I think):

  • spark-sql-kafka-0-10_2.12-3.2.1.jar
  • kafka-clients-2.1.1.jar
  • spark-streaming-kafka-0-10-assembly_2.12-3.2.1.jar
  • commons-pool2-2.8.0.jar
  • spark-token-provider-kafka-0-10_2.12-3.2.1.jar

The value of config spark.jars look like this "<path-to-jar/test1.jar>,<path-to-jar/test2.jar>"

This is the actual code:

spark_jars =  ("{},{},{},{},{}".format(os.getcwd() + "/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar",  
                                      os.getcwd() + "/jars/kafka-clients-2.1.1.jar", 
                                      os.getcwd() + "/jars/spark-streaming-kafka-0-10-assembly_2.12-3.2.1.jar", 
                                      os.getcwd() + "/jars/commons-pool2-2.8.0.jar",  
                                      os.getcwd() + "/jars/spark-token-provider-kafka-0-10_2.12-3.2.1.jar"))




spark = SparkSession.builder.config("spark.jars", spark_jars).appName("Structured_Redpanda_WordCount").getOrCreate()



spark.conf.set("spark.sql.shuffle.partitions", 1)

Comments

1

I have resolved the problem by adding spark-streaming-kafka-0-10-assembly_2.12-3.0.0-preview2.jar in the $SPARK_HOME/JARS folder and in spark-defaults.conf file as

spark.driver.extraClassPath       $SPARK_HOME/jars/*.jar

spark.executor.extraClassPath     $SPARK_HOME/jars/*.jar

And ran the spark-submit command as below:

$SPARK_HOME/bin/spark-submit --master yarn --jars $SPARK_HOME/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview.jar,$SPARK_HOME/jars/kafka-clients-2.2.0.jar,$SPARK_HOME/jars/spark-streaming-kafka-0-10-assembly_2.12-3.0.0-preview2.jar test.py

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.