0

while running kafka code getting

1) ERROR StreamExecution: Query [id = c6426655-446f-4306-91ba-d78e68e05c15, runId = 420382c1-8558-45a1-b26d-f6299044fa04] terminated with error java.lang.ExceptionInInitializerError

2)Exception in thread "stream execution thread for [id = c6426655-446f-4306-91ba-d78e68e05c15, runId = 420382c1-8558-45a1-b26d-f6299044fa04]" java.lang.ExceptionInInitializerError

3)Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: null

sbt dependency

// https://mvnrepository.com/artifact/org.apache.spark/spark-core libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.3"

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.3"

// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.3" % "provided"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.1.1"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.1.1"

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.2.3"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-scala libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.1.1"

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession


object demo1 {

  def main(args: Array[String]): Unit = {

    System.setProperty("hadoop.home.dir","c:\\hadoop\\")

    val spark: SparkSession = SparkSession.builder
      .appName("My Spark Application")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
      .config("spark.sql.streaming.checkpointLocation", "file:///C:/checkpoint")
      .getOrCreate

    spark.sparkContext.setLogLevel("ERROR")

    spark.conf.set("spark,sqlshuffle.partations","2")

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "162.244.80.189:9092")
      .option("startingOffsets", "earliest")
      .option("group.id","test1")
      .option("subscribe", "demo11")
      .load()

    import spark.implicits._


    val dsStruc = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp").as[(String, String, Timestamp)]


    val abc = df.writeStream
    .outputMode("append")
    .format("console")
    .start().awaitTermination()

    df.show()

1 Answer 1

1

I had the same issue. I have used the wrong library spark-sql-kafka library version (2.2.0 instead of 2.3.0). My successful configuration is:

org.apache.spark spark-core_2.11 2.3.0 provided

org.apache.spark spark-sql_2.11 2.3.0

org.apache.spark spark-sql-kafka-0-10_2.11 2.3.0

org.apache.kafka kafka-clients 0.10.1.0

I hope it helps. I got inspired by this post

https://community.hortonworks.com/content/supportkb/222428/error-microbatchexecution-query-id-567e4e77-9457-4.html

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.