0

I am receiving JSON data from Kafka brokers and I am reading it using Spark Streaming and Scala. Following is the example data:

{"timestamp":"2020-12-11 22:35:00.000000 UTC","tech":"Spark","version":2,"start_time":1607725688402210,"end_time":1607726131636059}

I receive this data as RDD[String] in my Scala code , now I want to read particular key from each data row, for example 'version' from the above data. I am able to do this as follows:

for(record <- rdd){
  val jsonRecord = JSON.parseFull(record );
  val globalMap = jsonRecord.get.asInstanceOf[Map[String, Any]]
  val version = globalMap.get("version").get.asInstanceOf[String]
}

But I am not sure if this is the best way to read RDD having JSON data. Please suggest.

Thanks,

8
  • can you use spark structured streaming? or don't want to? Commented Dec 11, 2020 at 23:46
  • I am using direct stream to fetch the Raw stream and then looping over that. Commented Dec 12, 2020 at 0:01
  • Spark version is 2.3.0.2 Commented Dec 12, 2020 at 0:05
  • why are using dstream, it going to be deprecated ? Commented Dec 12, 2020 at 0:08
  • 1
    I have an existing running code in production and can not make much changes to it. For now I need to achieve some use case with existing code base. Thanks, Commented Dec 12, 2020 at 0:11

1 Answer 1

1

Use json4s library to parse json data & It will be available with spark default no need to import extra libraries.

Check below code.

scala> rdd.collect.foreach(println)

{"timestamp":"2020-12-11 22:35:00.000000 UTC","tech":"Spark","version":2,"start_time":1607725688402210,"end_time":1607726131636059}

scala> :paste
// Entering paste mode (ctrl-D to finish)

rdd.map{ row =>

    // Import required libraries for json parsers.
    import org.json4s._
    import org.json4s.jackson.JsonMethods._
    implicit val formats = DefaultFormats

    // parse json message using parse function from json4s lib.

    val jsonData = parse(row)

    // extract required fields from parsed json data.

    // extracting version field value
    val version = (jsonData \\ "version").extract[Int] 

    // extracting timestamp field value
    val timestamp = (jsonData \\ "timestamp").extract[String] 

    (version,timestamp)
}
.collect
.foreach(println)


// Exiting paste mode, now interpreting.

(2,2020-12-11 22:35:00.000000 UTC)

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks Srinivas. It fits well for my use case.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.