1

I am getting the below error when trying to initiate a readStream for kafka, my Kafka is up and running and I tested it multiple times to ensure it is processing. Kafka topic is created as well.

'''

kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "mytopic") \
        .option("startingOffsets", "earliest") \
        .load()

'''

Traceback (most recent call last): File "C:/Users//PycharmProjects/SparkStreaming/PySparkKafkaStreaming.py", line 18, in kafka_df = spark.readStream
File "C:\Users<username>\AppData\Local\Programs\Python\Python38-32\lib\site-packages\pyspark\sql\streaming.py", line 420, in load return self._df(self._jreader.load()) File "C:\Users<username>\AppData\Local\Programs\Python\Python38-32\lib\site-packages\py4j\java_gateway.py", line 1304, in call return_value = get_return_value( File "C:\Users<username>\AppData\Local\Programs\Python\Python38-32\lib\site-packages\pyspark\sql\utils.py", line 134, in deco raise_from(converted) File "", line 3, in raise_from pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

1
  • You need to run or package and build with the dependencies for KAFKA Commented Dec 26, 2020 at 9:38

1 Answer 1

1

You need to import the kafka dependencies to run this ! For pyspark, you can download the jar and put in spark/jars directory or import the dependencies in the sparkSession inital config. Please, follow this kafka-structured streaming docs

I hope I've helped, anything you could ask me, thanks !

Sign up to request clarification or add additional context in comments.

2 Comments

Could you please elaborate a bit on that. Here is my SparkSession where I am including the jar details: if name == "main": spark = SparkSession \ .builder \ .master("local[3]") \ .config("spark.streaming.stopGracefullOnShutdown","true") \ .config("spark.jar.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1") \ .getOrCreate()
I tried to put the code here, but it didn't fit, so I'll be posting the github link I used: github.com/indiacloudtv/structuredstreamingkafkapyspark/blob/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.