0

I have multiple source JSON files where each JSON file will have different schemas present in it.

For example below 3 Json files, I've presented it in a tabular view for better understanding, but I've also provided the schema as well:

Json File 1:

Key Value Column_Name
sample_column.pull.notify.roid.alert aaa Column A
sample_column.pull.notify.roid.title bbb Column B
sample_column.pull.notify.roid.action.pan.content ccc Column C

JSON File 1 Schema:

|-- sample_column: struct (nullable = true)
|    |-- pull: struct (nullable = true)
|    |    |-- notify: struct (nullable = true)
|    |    |    |-- roid: struct (nullable = true)
|    |    |    |    |-- action: struct (nullable = true)
|    |    |    |    |    |-- pan: struct (nullable = true)
|    |    |    |    |    |    |-- content: string (nullable = true)
|    |    |    |    |-- alert: string (nullable = true)
|    |    |    |    |-- title: string (nullable = true)

Json File 2:

Key Value Column_Name
sample_column.cone.pull.notify.roid.alert a1a1a1 Column A
sample_column.cone.pull.notify.roid.title b1b1b1 Column B
sample_column.cone.pull.notify.roid.action.pan.content c1c1c1 Column C

JSON File 2 Schema:

|-- sample_column: struct (nullable = true)
|    |-- cone: struct (nullable = true)
|    |    |-- pull: struct (nullable = true)
|    |    |    |-- notify: struct (nullable = true)
|    |    |    |    |-- roid: struct (nullable = true)
|    |    |    |    |    |-- action: struct (nullable = true)
|    |    |    |    |    |    |-- pan: struct (nullable = true)
|    |    |    |    |    |    |    |-- content: string (nullable = true)
|    |    |    |    |    |-- alert: string (nullable = true)
|    |    |    |    |    |-- title: string (nullable = true)

Json File 3:

Key Value Column_Name
sample_column.var.pull.notify.roid.alert a2a2a2 Column A
sample_column.var.pull.notify.roid.title b2b2b2 Column B
sample_column.var.pull.notify.roid.action.pan.content c2c2c2 Column C

JSON File 3 Schema:

|-- sample_column: struct (nullable = true)
|    |-- var: struct (nullable = true)
|    |    |-- pull: struct (nullable = true)
|    |    |    |-- notify: struct (nullable = true)
|    |    |    |    |-- roid: struct (nullable = true)
|    |    |    |    |    |-- action: struct (nullable = true)
|    |    |    |    |    |    |-- pan: struct (nullable = true)
|    |    |    |    |    |    |    |-- content: string (nullable = true)
|    |    |    |    |    |-- alert: string (nullable = true)
|    |    |    |    |    |-- title: string (nullable = true)

I need to get "alert", "title" and "content" from those 3 different schemas and put all the "Alert" values under Column A, then all the "Title" values under Column B and all the "Content" value under Column C

This will be the expected output Table:

Column A Column B Column C
aaa bbb ccc
a1a1a1 b1b1b1 c1c1c1
a2a2a2 b2b2b2 c2c2c2

So, as the above scenario I tried to append all different schema like all ('a') values under column A then ('b') values under column B and ('c') values under column C but the values are getting overwritten like below:

Column A Column B Column C
a2a2a2 b2b2b2 c2c2c2

Below is the Pyspark Code that I've tried which overwrites the values but not appending it:

def has_column(df, col):
# Checks whether the dataframe has a column, if the dataframe has a column, returns True else False.
    try:
        df[col]
        return True
    except:
        return False

if has_column(df, "sample_column.pull.notify.roid.alert"):
     try:
         df = df.withColumn('Column A', when(df.sample_column.pull.notify.roid.alert.isNotNull(), df["sample_column.pull.notify.roid.alert"]))
     except:
         df = df.withColumn('Column A', lit("").cast("string"))

if has_column(df, "sample_column.cone.pull.notify.roid.title"):
     try:
         df = df.withColumn('Column A', when(df.sample_column.cone.pull.notify.roid.title.isNotNull(), df["sample_column.cone.pull.notify.roid.title"]))
     except:
         df = df.withColumn('Column A', lit("").cast("string"))

if has_column(df, "sample_column.var.pull.notify.roid.action.pan.content"):
     try:
         df = df.withColumn('Column A', when(df.sample_column.var.pull.notify.roid.action.pan.content.isNotNull(), df["sample_column.var.pull.notify.roid.action.pan.content"]))
     except:
         df = df.withColumn('Column A', lit("").cast("string"))

Help is much appreciated!

4
  • could you show df.printSchema()? it is a little hard to understand whether you mean the key is the nested object or literally your key has "." in it. Commented Jul 13, 2022 at 20:02
  • It's a nested object. I just represented in that table format for a clear view... I'll update it with the schema as well Commented Jul 14, 2022 at 3:21
  • The Above 3 JSON values are read and kept under a df. Commented Jul 14, 2022 at 3:43
  • So, I need to get "alert", "title" and "content" from those 3 different schemas and put all the "Alert" values under Column A, then all the "Title" values under Column B and all the "Content" value under Column C. Commented Jul 14, 2022 at 3:47

2 Answers 2

0
    data =[["aaa","bbb","ccc"],["a1a1a1","b1b1b1","c1c1c1"],["a2a2a2","b2b2b2","c2c2c2"]]
    columns = ["ColumnA","ColumnB","ColumnC"]
    df_in = spark.createDataFrame(data,columns)
    (
      df_in
        .agg(collect_list("ColumnA").alias("ColumnA"),
             collect_list("ColumnB").alias("ColumnB"),
             collect_list("ColumnC").alias("ColumnC"))
      .show(10, False)
    )
    # This result maybe what you wanted.
    #+---------------------+---------------------+---------------------+
    #|ColumnA              |ColumnB              |ColumnC              |
    #+---------------------+---------------------+---------------------+
    #|[aaa, a1a1a1, a2a2a2]|[bbb, b1b1b1, b2b2b2]|[ccc, c1c1c1, c2c2c2]|
    #+---------------------+---------------------+---------------------+
Sign up to request clarification or add additional context in comments.

4 Comments

We have the resulting data under a list in a single row, but I need aaa in one row, a1a1a1 in another row and a2a2a2 in another row.
u can try "explode" like this: ur_df.withColumn("ColumnA", explode(col("ColumnA"))) . Schema: ColumnA: array.
pyspark.sql.functions.explode(col): Returns a new row for each element in the given array or map.
Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.
0

You can manipulate them with coalesce.

First, set up what you want to extract.

paths = ['sample_column.pull.notify.roid',
         'sample_column.cone.pull.notify.roid',
         'sample_column.var.pull.notify.roid']

extract_keys = ['alert', 'title', 'action.pan.content']

Then use coalesce to get value where it exists.

df = (df.select(*[
          coalesce(*[col(f'{path}.{key}') for path in paths]).alias(key)
          for key in extract_keys
        ])
      .withColumnRenamed('action.pan.content', 'content')
)

Explaination

I used double list comprehension in above example but you can break this down.

This function;

coalesce(*[col(f'{path}.title') for path in paths])

translates to

coalesce('sample_column.pull.notify.roid.title',
         'sample_column.cone.pull.notify.roid.title',
         'sample_column.var.pull.notify.roid.title')

and this will return whichever has a (non-null) value per row (row=1 JSON).

And I am doing this for all extract_keys, that is the outer list comprehension.

Consideration

I hardcoded the paths in this example. If you are curious to go beyond and figure out the paths without hardcoding, you can probably look into Is there a way to collect the names of all fields in a nested schema in pyspark for some ideas.

1 Comment

Thank You for your response and opinions! I'll try this.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.