I am trying to read json message from Kafka topic into PySpark dataframe. My first take was this:
consumer = KafkaConsumer(TOPIC_NAME,
consumer_timeout_ms=9000,
bootstrap_servers=BOOTSTRAP_SERVER,
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id=str(uuid4()),
value_deserializer=lambda x: x.decode("utf-8"))
message_lst = []
for message in consumer:
message_str = message.value.replace('\\"', "'").replace("\n", "").replace("\r", "")
message_dict = json.loads(message_str)
message_lst.append(message_dict)
messages_json = sc.parallelize(message_lst)
messages_df = sqlContext.read.json(messages_json)
I am wondering is there a way to get the same dataframe using Spark structured streaming or something similar. Can anybody help? UPD: My try with structured streaming was this:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", f"{BOOTSTRAP_SERVER}") \
.option("subscribe", TOPIC_NAME) \
.load()
It exited with the following error:
pyspark.sql.utils.AnalysisException: Failed to find data source: Kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
UPD: I read the guide, that was stated in Exception text, it recommends installing this library "spark-sql-kafka-0-10_2.12", but I can't find one. Does anyone know something about this?
UPD 2: I managed to add needed package and tried this to read messages from kafka:
df = spark \
... .readStream \
... .format("kafka") \
... .option("kafka.bootstrap.servers", f"{BOOTSTRAP_SERVER}") \
... .option("subscribe", TOPIC_NAME) \
... .load()
df.writeStream.outputMode("append").format("console").start().awaitTermination()
I used the same consumer as before. The problem here is that it only reads the messages, that are written after the start() call. How can I read all message that was written at a given time and get results as dataframe? Also, can anybody give an example of the schema for load_json()? I am sorry if my questions are stupid, but I cant find any examples in Python.
from_json