2

Below is my first program working with kafka and pyspark. The code seems to run without exceptions, but the output of my query is empty.

I am initiating spark and kafka. Later, in Kafka initiation, I subscribed the topic = "quickstart-events" and from terminal produced messages for this topic. But when I run this code, it gives me blank dataframes.

How do I resolve?

Code:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession, DataFrame
from pyspark.sql.types import StructType, ArrayType, StructField, IntegerType, StringType, DoubleType

spark = SparkSession.builder \
.appName("Spark-Kafka-Integration") \
.master("local[2]") \
.getOrCreate()

dsraw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "quickstart-events") \
.load()

ds = dsraw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(type(ds))

rawQuery = dsraw \
        .writeStream \
        .queryName("query1")\
        .format("memory")\
        .start()

raw = spark.sql("select * from query1")
raw.show() # empty output

rawQuery = ds \
        .writeStream \
        .queryName("query2")\
        .format("memory")\
        .start()

raw = spark.sql("select * from query2")
raw.show()  # empty output
print("complete")

Output:

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+

+---+-----+
|key|value|
+---+-----+
+---+-----+
1
  • Why do you need to writeStream or two sql queries? You can ds.show(). Besides, is there actually data actively written to your topic EU) while you are running the Spark code? Note that you didn't provide the startingOffsets option Commented Sep 13, 2021 at 13:04

1 Answer 1

1

if you are learning and experimenting with kafka spark streaming then it is fine.

just use:

    while (True):
    time.sleep(5)
    print("queryresult")
    raw.show()  # it will start printing the result

instead of

            raw.show() # it will run only once that's why not printig the result.

DO NOT USE for Production code.

Better to write like:

spark = SparkSession.builder \
    .appName("Spark-Kafka-Integration") \
    .master("local[2]") \
    .getOrCreate()


dsraw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "quickstart-events") \
    .load()

ds = dsraw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

rawQuery = \
    ds \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

rawQuery.awaitTermination()

it will automatically print the result on the console.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.