I have a series of expressions used to map raw JSON data to normalized column data. I'm trying to think of a way to efficiently apply this to every row as there are multiple schemas to consider.
Right now, I have one massive CASE statement (built dynamically) that gets interpreted to SQL like this:
SELECT
CASE
WHEN schema = 'A' THEN CONCAT(get_json_object(payload, '$.FirstName'), ' ', get_json_object(payload, '$.LastName'))
WHEN schema = 'B' THEN get_json_object(payload, '$.Name')
END as name,
CASE
WHEN schema = 'A' THEN get_json_object(payload, '$.Telephone')
WHEN schema = 'B' THEN get_json_object(payload, '$.PhoneNumber')
END as phone_number
This works, I just worry about performance as the number of schemas and columns increases. I want to see if there's another way and here is my idea.
I have a DataFrame expressions_df of valid SparkSQL expressions.
| schema | column | column_expression |
|---|---|---|
| A | name | CONCAT(get_json_object(payload, '$.FirstName'), ' ', get_json_object(payload, '$.LastName')) |
| A | phone_number | get_json_object(payload, '$.Telephone') |
| B | name | get_json_object(payload, '$.Name') |
| B | phone_number | get_json_object(payload, '$.PhoneNumber') |
This DataFrame is used as a lookup table of sorts against a DataFrame raw_df:
| schema | payload |
|---|---|
| A | {"FirstName": "John", "LastName": "Doe", "Telephone": "123-456-7890"} |
| B | {"Name": "Jane Doe", "PhoneNumber": "123-567-1234"} |
I'd like to do something like this where column_expression is passed to F.expr and used to interpret the SQL and return the appropriate value.
from pyspark.sql import functions as F
(
raw_df
.join(expressions_df, 'schema')
.select(
F.expr(column_expression)
)
.dropDuplicates()
)
The desired end result would be something like this so that no matter what the original schema is, the data is transformed to the same standard using the expressions as shown in the SQL or expressions_df.
| name | phone_number |
| -------- | ------------ |
| John Doe | 123-456-7890 |
| Jane Doe | 123-567-1234 |