0

I have a series of expressions used to map raw JSON data to normalized column data. I'm trying to think of a way to efficiently apply this to every row as there are multiple schemas to consider.

Right now, I have one massive CASE statement (built dynamically) that gets interpreted to SQL like this:

SELECT
    CASE
        WHEN schema = 'A' THEN CONCAT(get_json_object(payload, '$.FirstName'), ' ', get_json_object(payload, '$.LastName'))
        WHEN schema = 'B' THEN get_json_object(payload, '$.Name')
    END as name,
    CASE
        WHEN schema = 'A' THEN get_json_object(payload, '$.Telephone')
        WHEN schema = 'B' THEN get_json_object(payload, '$.PhoneNumber')
    END as phone_number

This works, I just worry about performance as the number of schemas and columns increases. I want to see if there's another way and here is my idea.

I have a DataFrame expressions_df of valid SparkSQL expressions.

schema column column_expression
A name CONCAT(get_json_object(payload, '$.FirstName'), ' ', get_json_object(payload, '$.LastName'))
A phone_number get_json_object(payload, '$.Telephone')
B name get_json_object(payload, '$.Name')
B phone_number get_json_object(payload, '$.PhoneNumber')

This DataFrame is used as a lookup table of sorts against a DataFrame raw_df:

schema payload
A {"FirstName": "John", "LastName": "Doe", "Telephone": "123-456-7890"}
B {"Name": "Jane Doe", "PhoneNumber": "123-567-1234"}

I'd like to do something like this where column_expression is passed to F.expr and used to interpret the SQL and return the appropriate value.

from pyspark.sql import functions as F

(
    raw_df
    .join(expressions_df, 'schema')
    .select(
        F.expr(column_expression)
    )
    .dropDuplicates()
)

The desired end result would be something like this so that no matter what the original schema is, the data is transformed to the same standard using the expressions as shown in the SQL or expressions_df.

| name     | phone_number |
| -------- | ------------ |
| John Doe | 123-456-7890 |
| Jane Doe | 123-567-1234 |

1 Answer 1

1

You can't use directly a DataFrame column value as an expression with expr function. You'll have to collect all the expressions into a python object in order to be able to pass them as parameters to expr.

Here's one way to do it where the expressions are collected into a dict then for each schema we apply a different select expression. Finally, union all the dataframes to get the desired output:

from collections import defaultdict
from functools import reduce

import pyspark.sql.functions as F

exprs = defaultdict(list)
for r in expressions_df.collect():
    exprs[r.schema].append(F.expr(r.column_expression).alias(r.column))

schemas = [r.schema for r in raw_df.select("schema").distinct().collect()]

final_df = reduce(DataFrame.union, [raw_df.filter(f"schema='{s}'").select(*exprs[s]) for s in schemas])

final_df.show()
#+--------+------------+
#|    name|phone_number|
#+--------+------------+
#|Jane Doe|123-567-1234|
#|John Doe|123-456-7890|
#+--------+------------+
Sign up to request clarification or add additional context in comments.

3 Comments

So it looks like you're building a dict mapping {schema: [aliased_column_expressions]} then iterating over groups of records for each schema present in the df to apply the select. The union removes dupes.
I don't know if I like the iteration over each schema and filtering the dataframe, there can be 100's of iterations in the future. What would the performance be like instead it was one read over the dataset with a CASE for each schema on each column?
@TomNash yes that's exactly what the code does. DataFrame.union is equivalent to union all, dupes aren't removed.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.