I am trying to consume data using Spark published by kafka, but I am unable to do so. I am using Spark 2.2.
- I want to consume data sent by Kafka using Spark, process it and store in local file or HDFS.
- I want to print out the data sent out by kafka (consumed by spark) in console after running spark job.
For Kafka, I am following this tutorial: https://kafka.apache.org/quickstart
[cloudera@quickstart kafka]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>message 1
>message 2
>message 3
>message 4
Run Spark python script file.py:
./spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 file.py
Pyspark code:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("stream").getOrCreate()
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","localhost:9092")\
.option("subscribe","test")\
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic")
#Trying to save result in a file
df.writeStream\
.format("text")\
.option("checkpointLocation", "file:///home/cloudera/file.txt")\
.option("path","file:///home/cloudera/file.txt")\
.start()
# Does not write to a file
#Trying to print result in console
df.writeStream()\
.outputMode("append")\
.format("console")\
.start()
# Does not print to console and gives error: TypeError: 'DataStreamWriter' object is not callable
Any help?