I have a two tables:
entities
id | i | sources | name
----------------------------------------------------
1a | 0 | {"UK/bla": 1, "FR/blu": 2} | "mae"
1a | 1 | {"UK/bla": 1, "IT/bli": 2} | "coulson"
source_mapping
source_name | source_metadata
-----------------------------------------------------------------------------------------
"UK/bla" | {"source_name": "UK/bla", "description": "this is a description"}
"FR/blu" | {"source_name": "FR/blu", "description": "ceci est une description"}
"IT/bli" | {"source_name": "IT/bli", "description": "questa è una descrizione"}
What i would like to do, is to add a a column to my entity table of the sort:
id | i | sources | name | metadata
---------------------------------------------------------------
1a | 0 | [{"UK/bla": 1}, {"FR/blu": 2}] | ... | [{"source_name": "UK/bla", "description": "this is a description"}, {"source_name": "FR/blu", "description": "ceci est une description"}]
1a | 1 | [{"UK/bla": 1}, {"IT/bli": 2}] | ... | [{"source_name": "UK/bla", "description": "this is a description"}, {"source_name": "IT/bli", "description": "questa è una descrizione"}]
I did figure out a way to do this by doing:
entities_sources_exploded = (entities.select(F.col("id"),
F.col("i"),
F.explode(F.col("sources")))
.withColumnRenamed("key", "source_name")
.drop("value")) # get rid of it
entities_sources_exploded_with_metadata = (entities_sources_exploded
.join(sources_mapping,
entities_sources_exploded.source_name == sources_mapping.source_name,
"left"))
entities_with_metadata = (entities_sources_exploded_with_metadata
.groupBy(F.col("id"), F.col("i"))
.agg(F.collect_list("source_metadata").alias("metadata")))
And it works - but i have the sneaky suspicions there are ways to do this without exploding and working with HOF in spark SQL wrapped in an .expr() - i'd love to see how someone more fluent than me in that would solve this problem.