2

I am trying to read message from Kafka Topic. Message are in below format (sample format):

{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}

Also, please note topic has message from different tables and not just 1 table.

What I am trying to achieve is to read above message from Kafka Topic using Spark Structured Streaming and create a dataframe with column names ad its value both coming from JSON message itself.

I don't want to explicitly define a schema using case class or StructType.

I tried this:

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "topic1").option("startingOffsets", "earliest").load()

val y=df.select(get_json_object(($"value"), "$.payload").alias("payload")

When I view Y (which is a dataframe), it comes as 1 column with value under payload as JSON in that column.

How to get individual column in a dataframe? I am not achieve this.

(Again reiterating I cannot use a generic case class or StructType for schema part as messages coming through Kafka message are from different tables so I want a more of dynamic Schema created from JSON itself on the run.)

2
  • Umh, you could probably access to single values via dot notation, payload.schema.type should return "struct" as value Commented Jan 7, 2019 at 8:25
  • I think you need to cast the value to a string (from a byte array) before you can use get_json_object Commented Jan 13, 2019 at 17:51

1 Answer 1

1

Option 1: Change the Kafka Connect source to set value.converter.schemas.enable=false. This will only give you the (unwrapped payload to begin with), then you can skip to below post.

Otherwise, after you strip the Connect schema, you would need to use from_json() to apply a schema

val y = df.select(get_json_object($"value", "$.payload").alias("payload"))
val z = df.select(from_json($"payload", schema))

All your fields are strings, so would look like

val schema: StructType = StructType(Seq(
  StructField("emp_id", StringType()),
  StructField("emp_name", StringType()),
  StructField("city", StringType()),
  StructField("emp_sal", StringType()),
  StructField("manager_name", StringType())
))

Related

Sign up to request clarification or add additional context in comments.

6 Comments

Hi @OneCriketeer, if my payload is like this ``` "payload": { "data": { "nameField": "Myname", "AnotherField": [ { "value": { "unitField": "MyUnit", "valueField": "27" } } ], "timestampField": "2020-08-01T18:00:00" } } ``` , would i need to define some kind of nested struct?
@Minnie AnotherField would need to be an Array type, yes
I am stuck and I will post my question below to keep the topic concentrated.
@Minnie Please create a new post rather than ask a question after an accepted answer
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.