4

Below is my code. I have tried many different select variations, and yet the app runs, but without showing messages which are being written every second. I have a Spark Streaming example which using pprint() confirms kafka is in fact getting messages every second. The messages in Kafka are JSON formatted, see the schema for the field/column labels:

from pyspark.sql.functions import *
from pyspark.sql.types import *
import statistics


KAFKA_TOPIC = "vehicle_events_fast_testdata"
KAFKA_SERVER = "10.2.0.6:2181"

if __name__ == "__main__":
    print("NXB PySpark Structured Streaming with Kafka Demo Started")

    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka Demo") \
        .master("local[*]") \
        .config("spark.jars", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar,/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.executor.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.executor.extraLibrary", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.driver.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    schema = StructType() \
        .add("WheelAngle", IntegerType()) \
        .add("acceleration", IntegerType()) \
        .add("heading", IntegerType()) \
        .add("reading_time", IntegerType()) \
        .add("tractionForce", IntegerType()) \
        .add("vel_latitudinal", IntegerType()) \
        .add("vel_longitudinal", IntegerType()) \
        .add("velocity", IntegerType()) \
        .add("x_pos", IntegerType()) \
        .add("y_pos", IntegerType()) \
        .add("yawrate", IntegerType())


 # Construct a streaming DataFrame that reads from testtopic
    trans_det_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_SERVER) \
        .option("subscribe", KAFKA_TOPIC) \
        .option("startingOffsets", "latest") \
        .load() \
        .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")


#(from_json(col("value").cast("string"),schema))

    #Q1 =  trans_det_df.select(from_json(col("value"), schema).alias("parsed_value"), "timestamp")
    #Q2 =  trans_det_d.select("parsed_value*", "timestamp")


    query = trans_det_df.writeStream \
            .format("console") \
            .option("truncate","false") \
            .start() \
            .awaitTermination()
1
  • 1
    This has been resolved. I had to enable DEBUG, and noticed there was a connection issue, so updating the above port to 9092 resolved the issue. I had used the same port that was expected for Spark Streaming, as I did for this exercise which is Structured Streaming. Commented Dec 31, 2019 at 19:47

1 Answer 1

5

kafka.bootstrap.servers is the Kafka broker address (default port 9092), not Zookeeper (port 2181)

Also note your starting offsets are the latest, so you must produce data after starting the streaming application.

If you want to see existing topic data, use the earliest offsets.

Sign up to request clarification or add additional context in comments.

2 Comments

Hi Cricket, yes, after updating all these settings I begun writeStream confirming that I am able to see values "the kafka topic messages in json encoding. However, it is when I create queries for json columns i which I provided the schema as you can see above, that I now get only null values. I am able to for example see velocity readings performing a query for the velocity column ... Do you have any ideas here?
You may want to accept this answer and post a new question with these issues and your new code. But typically your schema is wrong if you get nulls. You didn't show your messages in the question, so I'm not sure what the solution would be

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.