0

I have a two tables:

entities

id | i | sources                        | name
----------------------------------------------------
1a | 0 | {"UK/bla": 1, "FR/blu": 2} | "mae"
1a | 1 | {"UK/bla": 1, "IT/bli": 2} | "coulson"

source_mapping

source_name | source_metadata
-----------------------------------------------------------------------------------------
"UK/bla"    | {"source_name": "UK/bla", "description": "this is a description"}
"FR/blu"    | {"source_name": "FR/blu", "description": "ceci est une description"}
"IT/bli"    | {"source_name": "IT/bli", "description": "questa è una descrizione"}

What i would like to do, is to add a a column to my entity table of the sort:

id | i | sources                        | name |  metadata   
---------------------------------------------------------------
1a | 0 | [{"UK/bla": 1}, {"FR/blu": 2}] | ...  | [{"source_name": "UK/bla", "description": "this is a description"}, {"source_name": "FR/blu", "description": "ceci est une description"}]
1a | 1 | [{"UK/bla": 1}, {"IT/bli": 2}] | ...  | [{"source_name": "UK/bla", "description": "this is a description"}, {"source_name": "IT/bli", "description": "questa è una descrizione"}]

I did figure out a way to do this by doing:

entities_sources_exploded = (entities.select(F.col("id"), 
                                             F.col("i"),
                                             F.explode(F.col("sources")))
                                     .withColumnRenamed("key", "source_name")
                                     .drop("value"))  # get rid of it

entities_sources_exploded_with_metadata = (entities_sources_exploded
                                           .join(sources_mapping,
                                                 entities_sources_exploded.source_name == sources_mapping.source_name,
                                                 "left"))
entities_with_metadata = (entities_sources_exploded_with_metadata
                          .groupBy(F.col("id"), F.col("i"))
                          .agg(F.collect_list("source_metadata").alias("metadata")))

And it works - but i have the sneaky suspicions there are ways to do this without exploding and working with HOF in spark SQL wrapped in an .expr() - i'd love to see how someone more fluent than me in that would solve this problem.

1 Answer 1

2

I think this should work:

import pandas as pd

# Setup data
data1 = pd.DataFrame({
    "id": ["1a", "1a"],
    "i": [0, 1],
    "sources": [{"UK/bla": 1, "FR/blu": 2}, {"UK/bla": 1, "IT/bli": 2}],
    "name": ["mae", "coulson"]
})
df1 = spark.createDataFrame(data1)
data2 = pd.DataFrame({
    "source_name": ["UK/bla", "FR/blu", "IT/bli"],
    "source_metadata": [
        {"source_name": "UK/bla", "description": "this is a description"},
        {"source_name": "FR/blu", "description": "ceci est une description"},
        {"source_name": "IT/bli", "description": "questa è una descrizione"}
    ]
})
df2 = spark.createDataFrame(data2)

# Create temp tables and execute SQL
df1.registerTempTable("df1")
df2.registerTempTable("df2")
query = """
    SELECT
        temp.id,
        temp.i,
        COLLECT_LIST(source) AS sources,
        temp.name,
        COLLECT_LIST(source_metadata) AS metadata
    FROM (
        SELECT
            *,
            map(key, value) AS source
        FROM (
            SELECT
                df1.id,
                df1.i,
                df1.name,
                EXPLODE(df1.sources)
            FROM df1
        ) AS df1_exploded
        JOIN df2
        ON df2.source_name = df1_exploded.key
    ) AS temp
    GROUP BY temp.id, temp.i, temp.name
"""
result = spark.sql(query)
result.show(5)
Sign up to request clarification or add additional context in comments.

1 Comment

beautiful - thanks!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.