5

Hello I am trying to use pyspark + kafka in order to do this I execute this command in order to set up the kafka-cluster

zookeeper-server-start.sh $KAFKA_HOME/../config/zookeeper.properties

kafka-server-start.sh $KAFKA_HOME/../config/*-0.properties & kafka-server-start.sh $KAFKA_HOME/../config/*-1.properties
  • Spark version is - spark-3.2.0-bin-hadoop2-7
  • Kafka version is - kafka_2.13-3.0.0
  • pyspark version is 3.2.0

The python code is:

spark_version = '3.2.0'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)

spark = SparkSession \
    .builder \
    .appName("TP3") \
    .getOrCreate()

!spark-submit --class TP3 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 TweetCount.ipynb

This return the following error:

Error: Failed to load class TP3.

And when I execute spark.readStream

consumer = KafkaConsumer('topic')
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", 'localhost:9092') \
    .option("subscribe", 'topic') \
    .load()

And I got this error:

Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

How can I execute the readstream in order to read from kafka with pyspark?

Thanks

1 Answer 1

10

Finally, I solved using the following code at the beginning of the notebook.

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'
Sign up to request clarification or add additional context in comments.

3 Comments

Could you explain what causes this error? because I've also added spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.0 in the spark-defaults.conf.template file inside /opt/homebrew/Cellar/apache-spark/3.2.1/libexec/conf/ but still getting this error. Anyway your solution cleared the error, but I want to know how?
I am not sure, but I think that is a dependency error, about the set the version that you need, only that I guess.
I was trying out pyspark in PyCharm and I was getting this error. I just added the lines above to my code and it worked. In Java this will be dependencies added to a pom/sbt file. But in python this will go in the submit command I imagine.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.