1

I've the below column(TriggeredDateTime) in my .avro file which is of type String, i would need to get the data in yyyy-MM-dd HH:mm:ss format(as shown in the expected output) using Spark-Scala. Please could you let me know is there any way to achieve this by writing an UDF, rather than using my below approach. Any help would be much appreciated.

 "TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}

   expected output
   _ _ _ _ _ _ _ _ _ _
  |TriggeredDateTime  |
  |___________________|
  |2019-05-16 04:56:19|
  |_ _ _ _ _ _ _ _ _ _|

My Approach:

I'm trying to convert .avro file to JSON format by applying the schema and then i can try parsing the JSON to get the required results.

DataFrame Sample Data:

[{"vin":"FU7123456XXXXX","basetime":0,"dtctime":189834,"latitude":36.341587,"longitude":140.327676,"dtcs":[{"fmi":1,"spn":2631,"dtc":"470A01","id":1},{"fmi":0,"spn":0,"dtc":"000000","id":61}],"signals":[{"timestamp":78799,"spn":174,"value":45,"name":"PT"},{"timestamp":12345,"spn":0,"value":10.2,"name":"PT"},{"timestamp":194915,"spn":0,"value":0,"name":"PT"}],"sourceEcu":"MCM","TriggeredDateTime":{"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}}]

DataFrame PrintSchema:

initialDF.printSchema
root
 |-- vin: string (nullable = true)
 |-- basetime: string (nullable = true)
 |-- dtctime: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- dtcs: string (nullable = true)
 |-- signals: string (nullable = true)
 |-- sourceEcu: string (nullable = true)
 |-- dtcTriggeredDateTime: string (nullable = true)
3
  • Could you please update it with whatever code/udf/json-parser you have written. Commented Nov 21, 2019 at 4:51
  • could you provide input dataframe after reading data from Avro Commented Nov 21, 2019 at 6:09
  • @Nikk, I've added sample data and printSchema of a dataframe. Commented Nov 21, 2019 at 8:08

1 Answer 1

2

Instead of writing an UDF you can use the build-in get_json_object to parse the json row and format_string to extract the desired output.

import org.apache.spark.sql.functions.{get_json_object, format_string}

val df = Seq(
  ("""{"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}"""),
  ("""{"dateTime":{"date":{"year":2018,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}""")
).toDF("TriggeredDateTime")

df.select(
  format_string("%s-%s-%s %s:%s:%s",
    get_json_object($"TriggeredDateTime", "$.dateTime.date.year").as("year"),
    get_json_object($"TriggeredDateTime", "$.dateTime.date.month").as("month"),
    get_json_object($"TriggeredDateTime", "$.dateTime.date.day").as("day"),
    get_json_object($"TriggeredDateTime", "$.dateTime.time.hour").as("hour"),
    get_json_object($"TriggeredDateTime", "$.dateTime.time.minute").as("min"),
    get_json_object($"TriggeredDateTime", "$.dateTime.time.second").as("sec")
  ).as("TriggeredDateTime")
).show(false)

Output:

+-----------------+
|TriggeredDateTime|
+-----------------+
|2019-5-16 4:56:19|
|2018-5-16 4:56:19|
+-----------------+

The function get_json_object will convert the string json into a json object then with the proper selector we extract each part of the date i.e: $.dateTime.date.year which we add as param to format_string function in order to generate the final date.

UPDATE:

For the sake of completeness instead of calling multiple times get_json_object we can use from_json providing the schema which we already know:

import org.apache.spark.sql.functions.{from_json, format_string}
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val df = Seq(
  ("""{"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}"""),
  ("""{"dateTime":{"date":{"year":2018,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}""")
).toDF("TriggeredDateTime")

val schema = 
StructType(Seq(
  StructField("dateTime", StructType(Seq(
        StructField("date",
              StructType(Seq(
                  StructField("year", IntegerType, false),
                  StructField("month", IntegerType, false),
                  StructField("day", IntegerType, false)
                )
              )
        ),
        StructField("time",
              StructType(Seq(
                  StructField("hour", IntegerType, false),
                  StructField("minute", IntegerType, false),
                  StructField("second", IntegerType, false),
                  StructField("nano", IntegerType, false)
                )
              )
        )
      )
    )
  ),
  StructField("offset", StructType(Seq(
        StructField("totalSeconds", IntegerType, false)
      )         
    )
  )
))              


df.select(
    from_json($"TriggeredDateTime", schema).as("parsedDateTime")
)
.select(
    format_string("%s-%s-%s %s:%s:%s",
      $"parsedDateTime.dateTime.date.year".as("year"),
      $"parsedDateTime.dateTime.date.month".as("month"),
      $"parsedDateTime.dateTime.date.day".as("day"),
      $"parsedDateTime.dateTime.time.hour".as("hour"),
      $"parsedDateTime.dateTime.time.minute".as("min"),
      $"parsedDateTime.dateTime.time.second".as("sec")
    ).as("TriggeredDateTime")
)
.show(false)

// +-----------------+
// |TriggeredDateTime|
// +-----------------+
// |2019-5-16 4:56:19|
// |2018-5-16 4:56:19|
// +-----------------+

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.