1

There are many way to read/ write spark dataframe to kafka. Am trying to read messages from kafka topic and create a data frame out of it. Am able to get pull the messages from topic, but am unable to convert it to a datafame. Any suggestion would be helpful.

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

consumer = KafkaConsumer('Jim_Topic')

for message in consumer:
    data = message
    print(data) # Printing the messages properly
    df = data.map # am unable to convert it to a dataframe.

I tried below way as well,

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "Jim_Topic") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Getting below error,

pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

0

1 Answer 1

2

Depending on your use-case, you can

  1. Either create a Kafka source for streaming queries
  2. or create a Kafka source of batch queries

For Streaming Queries

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "Jim_Topic")
  .load()

# Query data
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .as[(String, String)]

For Batch Queries

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "Jim_Topic")
  .load()

# Query data
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .as[(String, String)]

Make sure to add the required dependencies as well:

org.apache.spark:spark-sql-kafka-0-10_2.11:2.0.2

(replace with your Spark's version - the above refers to Spark version 2.0.2)

Sign up to request clarification or add additional context in comments.

9 Comments

Thanks for the quick help. I tired this logic already, am getting below error pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
@JimMacaulay How are you running your app? Is it through spark-submit?
am running from PyCharm directly. Not with spark-submit
@JimMacaulay You need to add the required dependency. See my updated answer.
Could you please help me to add the dependency. Am not sure how to add it. If Java i would have added in maven dependency. Am not sure about Python
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.