2

I am encountering problem with printing the data to console from kafka topic. The error message I get is shown in below image.

22/09/06 10:14:02 ERROR MicroBatchExecution: Query [id = ba6cb0ca-a3b1-41be-9551-7956650fbdab, runId = 7dfb967a-8bc2-4c73-91ae-5356cfb21ea4] terminated with error
java.lang.NoClassDefFoundError: org/apache/kafka/common/security/JaasContext
    at org.apache.spark.kafka010.KafkaTokenUtil$.isGlobalJaasConfigurationProvided(KafkaTokenUtil.scala:154)
    at org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:72)
    at org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:62)
    at org.apache.spark.sql.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:61)
    at org.apache.spark.sql.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded$(ConsumerStrategy.scala:60)
    at org.apache.spark.sql.kafka010.SubscribeStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:102)
    at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:106)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:82)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:533)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:578)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:577)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:531)
    at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:48)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:531)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchLatestOffsets(KafkaOffsetReaderConsumer.scala:311)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:150)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:145)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:392)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:392)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:385)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:382)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:613)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:378)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:211)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.security.JaasContext
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 50 more
Exception in thread "stream execution thread for [id = ba6cb0ca-a3b1-41be-9551-7956650fbdab, runId = 7dfb967a-8bc2-4c73-91ae-5356cfb21ea4]" java.lang.NoClassDefFoundError: org/apache/kafka/common/security/JaasContext
    at org.apache.spark.kafka010.KafkaTokenUtil$.isGlobalJaasConfigurationProvided(KafkaTokenUtil.scala:154)
    at org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:72)
    at org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:62)
    at org.apache.spark.sql.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:61)
    at org.apache.spark.sql.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded$(ConsumerStrategy.scala:60)
    at org.apache.spark.sql.kafka010.SubscribeStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:102)
    at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:106)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:82)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:533)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:578)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:577)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:531)
    at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:48)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:531)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchLatestOffsets(KafkaOffsetReaderConsumer.scala:311)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:150)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:145)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:392)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:392)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:385)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:382)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:613)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:378)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:211)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.security.JaasContext
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 50 more
Traceback (most recent call last):
  File "C:\Users\ULRICH TEDONGMO\Downloads\test Spark Stream\test 3\pyspark_structured_streaming-main\part4.1\kafka_streaming_json\kafka_streaming_json_demo.py", line 79, in <module>
    orders_agg_write_stream.awaitTermination()
  File "C:\spark\python\pyspark\sql\streaming.py", line 101, in awaitTermination
    return self._jsq.awaitTermination()
  File "C:\spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1304, in __call__
  File "C:\spark\python\pyspark\sql\utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.StreamingQueryException: org/apache/kafka/common/security/JaasContext
=== Streaming Query ===
Identifier: [id = ba6cb0ca-a3b1-41be-9551-7956650fbdab, runId = 7dfb967a-8bc2-4c73-91ae-5356cfb21ea4]
Current Committed Offsets: {}
Current Available Offsets: {}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false]
+- Project [order_country_name#32, order_city_name#33, sum(order_amount)#53 AS total_order_amount#58]
   +- Aggregate [order_country_name#32, order_city_name#33], [order_country_name#32, order_city_name#33, sum(cast(order_amount#30 as double)) AS sum(order_amount)#53]
      +- Project [orders#24.order_id AS order_id#27, orders#24.order_product_name AS order_product_name#28, orders#24.order_card_type AS order_card_type#29, orders#24.order_amount AS order_amount#30, orders#24.order_datetime AS order_datetime#31, orders#24.order_country_name AS order_country_name#32, orders#24.order_city_name AS order_city_name#33, orders#24.order_ecommerce_website_name AS order_ecommerce_website_name#34, timestamp#12]
         +- Project [from_json(StructField(order_id,StringType,true), StructField(order_product_name,StringType,true), StructField(order_card_type,StringType,true), StructField(order_amount,StringType,true), StructField(order_datetime,StringType,true), StructField(order_country_name,StringType,true), StructField(order_city_name,StringType,true), StructField(order_ecommerce_website_name,StringType,true), value#21, Some(Africa/Luanda)) AS orders#24, timestamp#12]
            +- Project [cast(value#8 as string) AS value#21, timestamp#12]
               +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@623489c6, KafkaV2[Subscribe[test-topic]]

All this are snapshots of the error messages. I don't understand the root cause of the errors occurring. Please help me.

Following are kafka and spark version:

spark version: spark-3.1.3-bin-hadoop2.7
kafka version: kafka_2.13-3.2.1

Here is my code

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import findspark
import time
import os

findspark.init()

kafka_topic_name = "test-topic"
kafka_bootstrap_servers = 'localhost:9092'

if __name__ == "__main__":

    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages ' \
                                        'org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3,' \
                                        'org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.1.3 ' \
                                        'pyspark-shell '

    print("Stream Data Processing Application Started ...")
    print(time.strftime("%Y-%m-%d %H:%M:%S"))

    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
        .master("local[*]") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    # Construct a streaming DataFrame that reads from test-topic
    orders_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()

    print("Printing Schema of orders_df: ")
    orders_df.printSchema()

    orders_df1 = orders_df.selectExpr("CAST(value AS STRING)", "timestamp")

    orders_schema = StructType() \
        .add("order_id", StringType()) \
        .add("order_product_name", StringType()) \
        .add("order_card_type", StringType()) \
        .add("order_amount", StringType()) \
        .add("order_datetime", StringType()) \
        .add("order_country_name", StringType()) \
        .add("order_city_name", StringType()) \
        .add("order_ecommerce_website_name", StringType())

    orders_df2 = orders_df1 \
        .select(from_json(col("value"), orders_schema) \
                .alias("orders"), "timestamp")

    orders_df3 = orders_df2.select("orders.*", "timestamp")
    orders_df3.printSchema()

    # Simple aggregate - find total_order_amount by grouping country, city
    orders_df4 = orders_df3.groupBy("order_country_name", "order_city_name")\
        .agg({'order_amount': 'sum'})\
        .select("order_country_name", "order_city_name", col("sum(order_amount)")
                .alias("total_order_amount"))

    print("Printing Schema of orders_df4: ")
    orders_df4.printSchema()

    # Write final result into console for debugging purpose
    orders_agg_write_stream = orders_df4\
        .writeStream.trigger(processingTime='5 seconds')\
        .outputMode("update")\
        .option("truncate", "false")\
        .format("console")\
        .start()

    orders_agg_write_stream.awaitTermination()

    print("Stream Data Processing Application Completed.")

How i solve this error. Please

1
  • 1
    ok @toyotaSupra Commented Sep 6, 2022 at 13:26

1 Answer 1

1

Can you share your dependencies including the versions? This usually happens when there are mismatching versions. I had this same error when I was using kafka-clients-0.10.2.2.jar. I resolved it by using kafka-clients-3.3.0.jar instead.

For reference, here are the dependency versions I used successfully:

kafka-clients-3.3.0.jar
spark-sql-kafka-0-10_2.12-3.3.0.jar
spark-streaming-kafka-0-10_2.12-3.3.0.jar
spark-token-provider-kafka-0-10_2.12-3.3.0.jar
Sign up to request clarification or add additional context in comments.

1 Comment

this combo works great! Thanks

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.