1

I have .log file in ADLS which contain multiple nested Json objects as follows

{"EventType":3735091736,"Timestamp":"2019-03-19","Data":{"Id":"event-c2","Level":2,"MessageTemplate":"Test1","Properties":{"CorrId":"d69b7489","ActionId":"d0e2c3fd"}},"Id":"event-c20b9c7eac0808d6321106d901000000"}
{"EventType":3735091737,"Timestamp":"2019-03-18","Data":{"Id":"event-d2","Level":2,"MessageTemplate":"Test1","Properties":{"CorrId":"f69b7489","ActionId":"d0f2c3fd"}},"Id":"event-d20b9c7eac0808d6321106d901000000"}
{"EventType":3735091738,"Timestamp":"2019-03-17","Data":{"Id":"event-e2","Level":1,"MessageTemplate":"Test1","Properties":{"CorrId":"g69b7489","ActionId":"d0d2c3fd"}},"Id":"event-e20b9c7eac0808d6321106d901000000"}

Need to read the above multiple nested Json objects in pyspark and convert to dataframe as follows

EventType    Timestamp       Data.[Id]  ..... [Data.Properties.CorrId]    [Data.Properties. ActionId]
3735091736   2019-03-19      event-c2   ..... d69b7489                    d0e2c3fd   
3735091737   2019-03-18      event-d2   ..... f69b7489                    d0f2c3fd
3735091738    2019-03-17     event-e2   ..... f69b7489                    d0d2c3fd

For above I am using ADLS,Pyspark in Azure DataBricks.

Does anyone know a general way to deal with above problem? Thanks!

5
  • Read row wise, and create a row, and keep appending to df Commented Oct 22, 2019 at 8:16
  • Can you please provide me an example for the same? As i am new to pyspark Commented Oct 23, 2019 at 4:41
  • Send me your log file Commented Oct 23, 2019 at 4:45
  • You can copy the text {"EventType.. and save it as a .log file. Commented Oct 23, 2019 at 5:02
  • you can find the file @ drive.google.com/open?id=1sPyG2-XnhtGBfEwZE-loc1OKGk2Aa54G Commented Oct 23, 2019 at 6:44

2 Answers 2

1
  1. You can read it into an RDD first. It will be read as a list of strings
  2. You need to convert the json string into a native python datatype using json.loads()
  3. Then you can convert the RDD into a dataframe, and it can infer the schema directly using toDF()
  4. Using the answer from Flatten Spark Dataframe column of map/dictionary into multiple columns, you can explode the Data column into multiple columns. Given your Id column is going to be unique. Note that, explode would return key, value columns for each entry in the map type.
  5. You can repeat the 4th point to explode the properties column.

Solution:

import json

rdd = sc.textFile("demo_files/Test20191023.log")
df = rdd.map(lambda x: json.loads(x)).toDF()
df.show()
# +--------------------+----------+--------------------+----------+
# |                Data| EventType|                  Id| Timestamp|
# +--------------------+----------+--------------------+----------+
# |[MessageTemplate ...|3735091736|event-c20b9c7eac0...|2019-03-19|
# |[MessageTemplate ...|3735091737|event-d20b9c7eac0...|2019-03-18|
# |[MessageTemplate ...|3735091738|event-e20b9c7eac0...|2019-03-17|
# +--------------------+----------+--------------------+----------+

data_exploded = df.select('Id', 'EventType', "Timestamp", F.explode('Data'))\
    .groupBy('Id', 'EventType', "Timestamp").pivot('key').agg(F.first('value'))
# There is a duplicate Id column and might cause ambiguity problems
data_exploded.show()

# +--------------------+----------+----------+--------+-----+---------------+--------------------+
# |                  Id| EventType| Timestamp|      Id|Level|MessageTemplate|          Properties|
# +--------------------+----------+----------+--------+-----+---------------+--------------------+
# |event-c20b9c7eac0...|3735091736|2019-03-19|event-c2|    2|          Test1|{CorrId=d69b7489,...|
# |event-d20b9c7eac0...|3735091737|2019-03-18|event-d2|    2|          Test1|{CorrId=f69b7489,...|
# |event-e20b9c7eac0...|3735091738|2019-03-17|event-e2|    1|          Test1|{CorrId=g69b7489,...|
# +--------------------+----------+----------+--------+-----+---------------+--------------------+
Sign up to request clarification or add additional context in comments.

4 Comments

@MangeshT. You're welcome! Please upvote if it was helpful :)
Sure. How to extract values from 'Properties''column as it is showing the 'String' Data Type. root |-- Id: string (nullable = true) |-- EventType: long (nullable = true) |-- Timestamp: string (nullable = true) |-- Exception: string (nullable = true) |-- Id: string (nullable = true) |-- Level: string (nullable = true) |-- LocalTimestamp: string (nullable = true) |-- MessageTemplate: string (nullable = true) |-- Properties: string (nullable = true) |-- RenderedMessage: string (nullable = true)
@MangeshT. not sure why it is reading it as Properties='{CorrId=d69b7489, ActionId=d0e2c3fd}'
Yeah Because the Data' is of map<string,string> type
0

I was able to read the data by following code.

from pyspark.sql.functions import *
DF = spark.read.json("demo_files/Test20191023.log") 

DF.select(col('Id'),col('EventType'),col('Timestamp'),col('Data.Id'),col('Data.Level'),col('Data.MessageTemplate'),
          col('Data.Properties.CorrId'),col('Data.Properties.ActionId'))\
  .show()```

***Result*** 

+--------------------+----------+----------+--------+-----+---------------+--------+--------+
|                  Id| EventType| Timestamp|      Id|Level|MessageTemplate|  CorrId|ActionId|
+--------------------+----------+----------+--------+-----+---------------+--------+--------+
|event-c20b9c7eac0...|3735091736|2019-03-19|event-c2|    2|          Test1|d69b7489|d0e2c3fd|
|event-d20b9c7eac0...|3735091737|2019-03-18|event-d2|    2|          Test1|f69b7489|d0f2c3fd|
|event-e20b9c7eac0...|3735091738|2019-03-17|event-e2|    1|          Test1|g69b7489|d0d2c3fd|
+--------------------+----------+----------+--------+-----+---------------+--------+--------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.