0

I want to read the data sent by the kafka producer, but I encountered the following problem:

pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

Then, according to the error message, I tried to search the official documentation and this website,Then I found something error like me:link1 However, through these methods I found that I still can't solve this problem, so want to ask if there is any better way to help me solve it Attached below is my error code and version information My error code:

from kafka import KafkaProducer
from pyspark.python.pyspark.shell import spark
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext
import json
import sys
import os
import findspark

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1'
findspark.init()
def ReadingDataToKafka():
    spark_conf = SparkConf().setAppName("KafkaWordCount")
    sc = SparkContext.getOrCreate(conf=spark_conf)
    sc.setLogLevel("ERROR")
    ssc = StreamingContext(sc, 1)
    ssc.checkpoint("file:///tmp/ZHYCargeProject")
    topics = 'sex'
    topicAry = topics.split(",")
    topicMap = {}
    for topic in topicAry:
        topicMap[topic] = 1
        df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "bigdataweb01:9092") \
        .option("subscribe", "sex") \
        .load()

The error message is as follows:

Traceback (most recent call last):
  File "/tmp/ZHYCargeProject/demo3/kafka_text.py", line 94, in <module>
    ReadingDataToKafka()
  File "/tmp/ZHYCargeProject/demo3/kafka_text.py", line 23, in ReadingDataToKafka
    df = spark.readStream \
  File "/home/ubuntu/anaconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/sql/streaming.py", line 482, in load
    return self._df(self._jreader.load())
  File "/home/ubuntu/anaconda3/envs/pyspark/lib/python3.8/site-packages/py4j/java_gateway.py", line 1309, in __call__
    return_value = get_return_value(
  File "/home/ubuntu/anaconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

Related version information:

  • python== 3.8.13

  • java==1.8.0_312

  • Spark==3.2.1

  • kafka==2.12-3.20

  • scala==2.12.15

  • kafka-python==2.0.2

  • pyspark==3.1.2

4
  • 1
    Try with SparkSession spark = SparkSession.builder.appName("yourApp").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") Commented Jun 30, 2022 at 9:14
  • 1) You should uninstall kafka-python since it isn't used. Spark has its own Kafka producer methods 2) Are you running the code with spark-submit or pyspark command? 3) I updated the linked post, you might want to check again... Also, how do you have pyspark==3.1.2 , but Spark 3.2.1?? Commented Jun 30, 2022 at 19:06
  • Thank you for helping me solve the problem~ First, I tried using ` spark =SparkSession.builder.appName("yourApp").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") ` This method, and a small refactoring of my code with reference to this article then my pyspark version refers to pypi and I use ` Pycharm ` to write and run the code Commented Jul 1, 2022 at 2:30
  • Pycharm doesn't matter unless you have it using a virtualenv that has dependencies separate from those you listed. So, it works now? Or not? Commented Jul 1, 2022 at 9:25

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.