There are many way to read/ write spark dataframe to kafka. Am trying to read messages from kafka topic and create a data frame out of it. Am able to get pull the messages from topic, but am unable to convert it to a datafame. Any suggestion would be helpful.
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
consumer = KafkaConsumer('Jim_Topic')
for message in consumer:
data = message
print(data) # Printing the messages properly
df = data.map # am unable to convert it to a dataframe.
I tried below way as well,
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "Jim_Topic") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Getting below error,
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;