1

I'm using Spark 3.1.2, Kafka 2.8.1 & Scala 2.12.1

Getting below Error while integrating Kafka and Spark streaming -

java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/read/streaming/ReportsSourceMetrics

Spark-shell command with Dependency - spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2

 org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
 :: resolving dependencies :: org.apache.spark#spark-submit-parent-3643b83d-a2f8-43d1-941f-a125272f3905;1.0
         confs: [default]
         found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
         found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
         found org.apache.kafka#kafka-clients;2.6.0 in central
         found com.github.luben#zstd-jni;1.4.8-1 in central
         found org.lz4#lz4-java;1.7.1 in central
         found org.xerial.snappy#snappy-java;1.1.8.2 in central
         found org.slf4j#slf4j-api;1.7.30 in central
         found org.spark-project.spark#unused;1.0.0 in central
         found org.apache.commons#commons-pool2;2.6.2 in central
 :: resolution report :: resolve 564ms :: artifacts dl 9ms
         :: modules in use:
         com.github.luben#zstd-jni;1.4.8-1 from central in [default]
         org.apache.commons#commons-pool2;2.6.2 from central in [default]
         org.apache.kafka#kafka-clients;2.6.0 from central in [default]
         org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 from central in [default]
         org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 from central in [default]
         org.lz4#lz4-java;1.7.1 from central in [default]
         org.slf4j#slf4j-api;1.7.30 from central in [default]
         org.spark-project.spark#unused;1.0.0 from central in [default]
         org.xerial.snappy#snappy-java;1.1.8.2 from central in [default]
         ---------------------------------------------------------------------
         |                  |            modules            ||   artifacts   |
         |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
         ---------------------------------------------------------------------
         |      default     |   9   |   0   |   0   |   0   ||   9   |   0   |
         ---------------------------------------------------------------------
 :: retrieving :: org.apache.spark#spark-submit-parent-3643b83d-a2f8-43d1-941f-a125272f3905
         confs: [default]
         0 artifacts copied, 9 already retrieved (0kB/15ms)
 21/12/28 17:46:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
 Setting default log level to "WARN".
 To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
 21/12/28 17:46:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
 Spark context Web UI available at http://*******:4041
 Spark context available as 'sc' (master = local[*], app id = local-1640693788919).
 Spark session available as 'spark'.
 Welcome to
       ____              __
      / __/__  ___ _____/ /__
     _\ \/ _ \/ _ `/ __/  '_/
    /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
       /_/
 
 Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)
 Type in expressions to have them evaluated.
 Type :help for more information.
    
    val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "127.0.1.1:9092").option("subscribe", "Topic").option("startingOffsets", "earliest").load()
    
    df.printSchema()
    
    import org.apache.spark.sql.types._
    val schema = new StructType().add("id",IntegerType).add("fname",StringType).add("lname",StringType)
    val personStringDF = df.selectExpr("CAST(value AS STRING)")
    val personDF = personStringDF.select(from_json(col("value"), schema).as("data")).select("data.*")
     
    personDF.writeStream.format("console").outputMode("append").start().awaitTermination()
    
    Exception in thread "stream execution thread for [id = 44e8f8bf-7d94-4313-9d2b-88df8f5bc10f, runId = 3b4c63c4-9062-4288-a681-7dd6cfb836d0]" java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/read/streaming/ReportsSourceMetrics

3 Answers 3

3

Spark_version 3.1.2

Scala_version 2.12.10

Kafka_version 2.8.1

Note: The versions are very important when we use --packages org.apache.spark:spark-sql-kafka-0-10_2.12:V.V.V with either spark-shell or spark-submit. Where (V.V.V = Spark_version)

I followed the following steps as given at spark-kafka-example:

  1. start producer $ kafka-console-producer.sh --broker-list Kafka-Server-IP:9092 --topic kafka-spark-test

You should see the prompt > on console. Enter some test data on producer.

>{"name":"foo","dob_year":1995,"gender":"M","salary":2000}
>{"name":"bar","dob_year":1996,"gender":"M","salary":2500}
>{"name":"baz","dob_year":1997,"gender":"F","salary":3500}
>{"name":"foo-bar","dob_year":1998,"gender":"M","salary":4000}

  1. Start the spark-shell as follows:
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2

Notice: i have used 3.1.2. You will see something like following on successful start:

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
         
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 11.0.13)
Type in expressions to have them evaluated.
Type :help for more information.

  1. Enter the imports and create DataFrame, and print the schema.
val df = spark.readStream.
      format("kafka"). 
      option("kafka.bootstrap.servers", "Kafka-Server-IP:9092").
      option("subscribe", "kafka-spark-test").
      option("startingOffsets", "earliest").
      load()

df.printSchema()
  1. The successful execution should result following:
scala> df.printSchema()
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)


  1. Convert binary values of DataFrame to string. I am showing command with output
scala>      val personStringDF = df.selectExpr("CAST(value AS STRING)")


personStringDF: org.apache.spark.sql.DataFrame = [value: string]

  1. Make and schema for DataFrame. I am showing command with output
scala> val schema = new StructType().
     |       add("name",StringType).
     |       add("dob_year",IntegerType).
     |       add("gender",StringType).
     |       add("salary",IntegerType)


schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(dob_year,IntegerType,true), StructField(gender,StringType,true), StructField(salary,IntegerType,true))

  1. Select the data
scala>  val personDF = personStringDF.select(from_json(col("value"), schema).as("data")).select("data.*")

personDF: org.apache.spark.sql.DataFrame = [name: string, dob_year: int ... 2 more fields]

  1. Write the stream on console

scala>  personDF.writeStream.
     |       format("console").
     |       outputMode("append").
     |       start().
     |       awaitTermination()


You will see the following output:

-------------------------------------------                                     
Batch: 0
-------------------------------------------
+-------+--------+------+------+
|   name|dob_year|gender|salary|
+-------+--------+------+------+
|    foo|    1981|     M|  2000|
|    bar|    1982|     M|  2500|
|    baz|    1983|     F|  3500|
|foo-bar|    1984|     M|  4000|
+-------+--------+------+------+

If your kafka producer is still running, you may enter a new row and you will see the new data in Batch: 1 and so on for each time you enter new data in producers.

This is a typical example when we enter data from console producer and consume in spark console.

Good luck! :)

Sign up to request clarification or add additional context in comments.

Comments

1

I had nearly the same problem - same exception but in spark-submit. I solved it by upgrading Spark to version 3.2.0. I also used version 3.2.0 of org.apache.spark:spark-sql-kafka-0-10_2.12 with the full command being:

$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 script.py

1 Comment

Thanks for your comment. I will upgrade to the latest and give a try.
0

just check your spark version with spark.version and adjust the packages as suggested in the other answers.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.