I'm using Spark 3.1.2, Kafka 2.8.1 & Scala 2.12.1
Getting below Error while integrating Kafka and Spark streaming -
java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/read/streaming/ReportsSourceMetrics
Spark-shell command with Dependency - spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3643b83d-a2f8-43d1-941f-a125272f3905;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
found org.apache.kafka#kafka-clients;2.6.0 in central
found com.github.luben#zstd-jni;1.4.8-1 in central
found org.lz4#lz4-java;1.7.1 in central
found org.xerial.snappy#snappy-java;1.1.8.2 in central
found org.slf4j#slf4j-api;1.7.30 in central
found org.spark-project.spark#unused;1.0.0 in central
found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 564ms :: artifacts dl 9ms
:: modules in use:
com.github.luben#zstd-jni;1.4.8-1 from central in [default]
org.apache.commons#commons-pool2;2.6.2 from central in [default]
org.apache.kafka#kafka-clients;2.6.0 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 from central in [default]
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 from central in [default]
org.lz4#lz4-java;1.7.1 from central in [default]
org.slf4j#slf4j-api;1.7.30 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.8.2 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 9 | 0 | 0 | 0 || 9 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-3643b83d-a2f8-43d1-941f-a125272f3905
confs: [default]
0 artifacts copied, 9 already retrieved (0kB/15ms)
21/12/28 17:46:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/28 17:46:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://*******:4041
Spark context available as 'sc' (master = local[*], app id = local-1640693788919).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)
Type in expressions to have them evaluated.
Type :help for more information.
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "127.0.1.1:9092").option("subscribe", "Topic").option("startingOffsets", "earliest").load()
df.printSchema()
import org.apache.spark.sql.types._
val schema = new StructType().add("id",IntegerType).add("fname",StringType).add("lname",StringType)
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val personDF = personStringDF.select(from_json(col("value"), schema).as("data")).select("data.*")
personDF.writeStream.format("console").outputMode("append").start().awaitTermination()
Exception in thread "stream execution thread for [id = 44e8f8bf-7d94-4313-9d2b-88df8f5bc10f, runId = 3b4c63c4-9062-4288-a681-7dd6cfb836d0]" java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/read/streaming/ReportsSourceMetrics