1

I am encountering problem with printing the data to console from kafka topic. The error message I get is shown in below image. Exception in Task error

As you can see in the above image that after batch 0 , it doesn't process further.

Error2

Error3

All this are snapshots of the error messages. I don't understand the root cause of the errors occurring. Please help me.

Following are kafka and spark version:

spark version: spark-3.1.1-bin-hadoop2.7
kafka version: kafka_2.13-2.7.0

I am using the following jars:

kafka-clients-2.7.0.jar 
spark-sql-kafka-0-10_2.12-3.1.1.jar 
spark-token-provider-kafka-0-10_2.12-3.1.1.jar 

Here is my code:

spark = SparkSession \
        .builder \
        .appName("Pyspark structured streaming with kafka and cassandra") \
        .master("local[*]") \
        .config("spark.jars","file:///C://Users//shivani//Desktop//Spark//kafka-clients-2.7.0.jar,file:///C://Users//shivani//Desktop//Spark//spark-sql-kafka-0-10_2.12-3.1.1.jar,file:///C://Users//shivani//Desktop//Spark//spark-cassandra-connector-2.4.0-s_2.11.jar,file:///D://mysql-connector-java-5.1.46//mysql-connector-java-5.1.46.jar,file:///C://Users//shivani//Desktop//Spark//spark-token-provider-kafka-0-10_2.12-3.1.1.jar")\
        .config("spark.executor.extraClassPath","file:///C://Users//shivani//Desktop//Spark//kafka-clients-2.7.0.jar,file:///C://Users//shivani//Desktop//Spark//spark-sql-kafka-0-10_2.12-3.1.1.jar,file:///C://Users//shivani//Desktop//Spark//spark-cassandra-connector-2.4.0-s_2.11.jar,file:///D://mysql-connector-java-5.1.46//mysql-connector-java-5.1.46.jar,file:///C://Users//shivani//Desktop//Spark//spark-token-provider-kafka-0-10_2.12-3.1.1.jar")\
        .config("spark.executor.extraLibrary","file:///C://Users//shivani//Desktop//Spark//kafka-clients-2.7.0.jar,file:///C://Users//shivani//Desktop//Spark//spark-sql-kafka-0-10_2.12-3.1.1.jar,file:///C://Users//shivani//Desktop//Spark//spark-cassandra-connector-2.4.0-s_2.11.jar,file:///D://mysql-connector-java-5.1.46//mysql-connector-java-5.1.46.jar,file:///C://Users//shivani//Desktop//Spark//spark-token-provider-kafka-0-10_2.12-3.1.1.jar")\
        .config("spark.driver.extraClassPath","file:///C://Users//shivani//Desktop//Spark//kafka-clients-2.7.0.jar,file:///C://Users//shivani//Desktop//Spark//spark-sql-kafka-0-10_2.12-3.1.1.jar,file:///C://Users//shivani//Desktop//Spark//spark-cassandra-connector-2.4.0-s_2.11.jar,file:///D://mysql-connector-java-5.1.46//mysql-connector-java-5.1.46.jar,file:///C://Users//shivani//Desktop//Spark//spark-token-provider-kafka-0-10_2.12-3.1.1.jar")\
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")


#streaming dataframe that reads from kafka topic
    df_kafka=spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers",kafka_bootstrap_servers)\
    .option("subscribe",kafka_topic_name)\
    .option("startingOffsets", "latest") \
    .load()

    print("Printing schema of df_kafka:")
    df_kafka.printSchema()

    #converting data from kafka broker to string type
    df_kafka_string=df_kafka.selectExpr("CAST(value AS STRING) as value")

    # schema to read json format data
    ts_schema = StructType() \
        .add("id_str", StringType()) \
        .add("created_at", StringType()) \
        .add("text", StringType())

    #parse json data
    df_kafka_string_parsed=df_kafka_string.select(from_json(col("value"),ts_schema).alias("twts"))

    df_kafka_string_parsed_format=df_kafka_string_parsed.select("twts.*")
    df_kafka_string_parsed_format.printSchema()


    df=df_kafka_string_parsed_format.writeStream \
    .trigger(processingTime="1 seconds") \
    .outputMode("update")\
    .option("truncate","false")\
    .format("console")\
    .start()

    df.awaitTermination()
3
  • Please provide your code. Commented Jul 2, 2021 at 11:53
  • @AchyutVyas I have edited my question and provided the code. Please check out. Commented Jul 2, 2021 at 12:31
  • What happens if you remove kafka-clients JAR? Also, Windows uses double-backslash for filepaths, not // (forward slash does not need "escaped") Commented Jul 2, 2021 at 14:30

1 Answer 1

3

The error (NoClassDefFound, followed by the kafka010 package) is saying that spark-sql-kafka-0-10 is missing its transitive dependency on org.apache.commons:commons-pool2:2.6.2, as you can see here

You can either download that JAR as well, or you can change your code to use --packages instead of spark.jars option, and let Ivy handle downloading transitive dependencies

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache...'

spark = SparkSession.bulider...
Sign up to request clarification or add additional context in comments.

2 Comments

Could you expand a bit on this? I added the .jar via sparkSesh = SparkSession.builder.config("spark.driver.extraClassPath", "/home/ubuntu/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar,/home/ubuntu/jars/commons-pool2-2.11.0.jar") - what's the --packages alternative?
@James Rather than downloading the JAR, use the Maven coordinates (in the colon separated format) mvnrepository.com/artifact/org.apache.spark/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.