0

Currently we are working on a real time data feeds having Json data.

While reading the examples from - https://sparkbyexamples.com/spark/spark-streaming-with-kafka/

It looks like we need a schema for kafka json message.

Is there any other way to process data without schema ?

6
  • What kind of processing do you want to perform without a schema? Commented Feb 25, 2022 at 12:26
  • Copy Entire data AS IS to Cloud Storage for now. Commented Feb 25, 2022 at 12:28
  • The data is in the Value field as a string. There you have the data from the topic Commented Feb 25, 2022 at 12:38
  • The Value field is binary from the topic. we need to deserialize it and it requires schema to deserialize. Commented Feb 25, 2022 at 12:41
  • Not necessarily you just need to convert value part in string and then writestrem into destination location Commented Feb 25, 2022 at 14:56

2 Answers 2

2

try below code after running the zookeeper, Kafka server and other required service.

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest")\
        .load()  #earliest

print("Printing Schema of transaction_detail_df: ")
df.printSchema()

transaction_detail_df1 = df.selectExpr("CAST(value AS STRING)")

trans_detail_write_stream = transaction_detail_df1 \
    .writeStream \
    .trigger(processingTime='2 seconds') \
    .option("truncate", "false") \
    .format("console") \
    .start()

trans_detail_write_stream.awaitTermination()

just change the basic configuration, you would be able to see the output

Sign up to request clarification or add additional context in comments.

3 Comments

Thanks, This helps to save the json in single column called value. How to extract data from fields inside json ?
You can explode the dataframe columns based on requirement
Thanks, This worked with work around. Since saving the json value this way makes the schema as single string column where 'explode' cannot be applied. However writing as intermediate file as text and reading as json worked.
0

You can use get_json_object SparkSQL function to parse data out of JSON string data without defining any additional schema.

You can simply use cast function to deserialize the binary key/value, as the example shows

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.