0

I am trying to read json message from Kafka topic into PySpark dataframe. My first take was this:

consumer = KafkaConsumer(TOPIC_NAME,
                             consumer_timeout_ms=9000,
                             bootstrap_servers=BOOTSTRAP_SERVER,
                             auto_offset_reset='earliest',
                             enable_auto_commit=True,
                             group_id=str(uuid4()),
                             value_deserializer=lambda x: x.decode("utf-8"))
message_lst = []
    for message in consumer:
        message_str = message.value.replace('\\"', "'").replace("\n", "").replace("\r", "")
        message_dict = json.loads(message_str)
        message_lst.append(message_dict)

    messages_json = sc.parallelize(message_lst)
    messages_df = sqlContext.read.json(messages_json)

I am wondering is there a way to get the same dataframe using Spark structured streaming or something similar. Can anybody help? UPD: My try with structured streaming was this:

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", f"{BOOTSTRAP_SERVER}") \
        .option("subscribe", TOPIC_NAME) \
        .load()

It exited with the following error: pyspark.sql.utils.AnalysisException: Failed to find data source: Kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide". UPD: I read the guide, that was stated in Exception text, it recommends installing this library "spark-sql-kafka-0-10_2.12", but I can't find one. Does anyone know something about this? UPD 2: I managed to add needed package and tried this to read messages from kafka:

df = spark \
...         .readStream \
...         .format("kafka") \
...         .option("kafka.bootstrap.servers", f"{BOOTSTRAP_SERVER}") \
...         .option("subscribe", TOPIC_NAME) \
...         .load()
df.writeStream.outputMode("append").format("console").start().awaitTermination()

I used the same consumer as before. The problem here is that it only reads the messages, that are written after the start() call. How can I read all message that was written at a given time and get results as dataframe? Also, can anybody give an example of the schema for load_json()? I am sorry if my questions are stupid, but I cant find any examples in Python.

4
  • 2
    This should help: spark.apache.org/docs/latest/…. It shows how to read a Kafka stream using Spark Structured Streaming. In the examples the values are read as strings, but you can easily interpret them as json using the built-in function from_json Commented Jun 3, 2021 at 19:41
  • So, you know about Structured Streaming, but it's unclear what you've tried Commented Jun 4, 2021 at 13:09
  • @OneCricketeer Updated the question; check it one more time, please. Commented Jun 6, 2021 at 22:53
  • What do you mean you couldn't find it? It's a Maven package, not Python mvnrepository.com/artifact/org.apache.spark/… Commented Jun 7, 2021 at 13:17

1 Answer 1

2

You're missing the kafka package as mentioned in the main documentation

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 ...

Make sure that 3.1.2 listed here matches your own Spark version

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you a lot for this answer, it helped me solve the issue, but I faced new ones. I updated the question; please, check it one more time

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.