1

I am reading messages from a kafka topic

messageDFRaw = spark.readStream\
                    .format("kafka")\
                    .option("kafka.bootstrap.servers", "localhost:9092")\
                    .option("subscribe", "test-message")\
                    .load()

messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as dict")

When I print the data frame from the above query I get the below console output.

|key|dict|
|#badbunny |{"channel": "#badbunny", "username": "mgat22", "message": "cool"}|

How can I create a data frame from the DataStreamReader such that I have a dataframe with columns as |key|channel| username| message|

I tried following the accepted answer in How to read records in JSON format from Kafka using Structured Streaming?

struct = StructType([
    StructField("channel", StringType()),
    StructField("username", StringType()),
    StructField("message", StringType()),
])

messageDFRaw.select(from_json("CAST(value AS STRING)", struct))

but, I get Expected type 'StructField', got 'StructType' instead in from_json()

1 Answer 1

1

I ignored the warning Expected type 'StructField', got 'StructType' instead in from_json().

However, I had to cast the value from kafka message initially and then parse the json schema later.

messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

messageParsedDF = messageDF.select(from_json("value", struct_schema).alias("message"))

messageFlattenedDF = messageParsedDF.selectExpr("value.channel", "value.username", "value.message")
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.