0

How to convert multiple columns to maps after grouping and pivoting in pyspark. Code below is not working as expected.

Input dataframe:

ID TYPE Order Address Phone
1 A Primary abc 111
1 B Primary def 222
1 A Secondary ghi 333
1 B Secondary jkl 444
2 A Primary mno 555
2 A Secondary pqr 666
2 B Primary stu 777
2 B Secondary vwy 888

Expected Output dataframe:

ID Primary_A_attributes Primary_B_attributes Secondary_A_attributes Secondary_B_attributes
1 {"Address" : "abc", "phone" : "111"} {"Address" : "def", phone" : "222"} {"Address" : "ghi", "phone" : "333"} {"Address" : "jkl", phone" : "444"}
2 {"Address":"mno", "phone":"555" {"Address" : "pqr", phone" : "666"} {"Address":"stu", "phone":"777" {"Address" : "vwy", phone" : "888"}

code used:

df.withColumn("collection",F.upper(F.concat_ws('_attributes_','order','type')))\
.groupBy('id').pivot("collection").agg(F.create_map(F.lit("Address"),F.col("Address")\
F.lit("phone"),F.col("phone"))).display()

2 Answers 2

1

desired solution :

from pyspark.sql import functions as F
from pyspark.sql import Window as W
data =[("1","A","Primary","abc","111"),
("1","B","Primary","def","222"),
("1","A","Secondary","ghi","333"),
("1","B","Secondary","jkl","444"),
("2","A","Primary","mno","555"),
("2","A","Secondary","pqr","666"),
("2","B","Primary","stu","777"),
("2","B","Secondary","vwy","888")]
schema=["ID","TYPE","Order","Address","Phone"]
df_source = spark.createDataFrame(data,schema)
expr_array=F.array(F.lit("Address"),F.lit("Phone"))
df_fnl = df_source.withColumn("collection",F.concat_ws('_','order','type'))\
.groupBy(["id"]).pivot("collection").agg(F.map_from_arrays(expr_array,F.array_union(F.collect_list("Address"),F.collect_list("Phone"))))

df_fnl.select([F.col(col).alias(col+"_attributes") if col !='id' else col for col in df_fnl.columns ]).show(10,0)

OUTPUT :
---+------------------------------+------------------------------+------------------------------+------------------------------+
|id |Primary_A_attributes           |Primary_B_attributes           |Secondary_A_attributes         |Secondary_B_attributes         |
+---+------------------------------+------------------------------+------------------------------+------------------------------+
|1  |{Address -> abc, Phone -> 111}|{Address -> def, Phone -> 222}|{Address -> ghi, Phone -> 333}|{Address -> jkl, Phone -> 444}|
|2  |{Address -> mno, Phone -> 555}|{Address -> stu, Phone -> 777}|{Address -> pqr, Phone -> 666}|{Address -> vwy, Phone -> 888}|
+---+------------------------------+------------------------------+------------------------------+------------------------------+

kindly UPVOTE IF YOU LIKE MY SOLUTION .

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for your response. Can you please help in updating this scenario for multiple values for a single key. like this. {address -> abc,def phone -> 111,222}
@pradeepnadarajan kindly share the data and expected out put so that i can try by myself. if you like my answer , kindly upvote .
1

You can first create the maps and then use first() to fill the values in pivot.

data_sdf. \
    withColumn('collection', func.lower(func.concat_ws('_attributes_', 'order', 'type'))). \
    withColumn('adr_ph_struct', 
               func.array(*[func.struct(func.lit(k).alias('key'), func.col(k).alias('val')) for k in ['address', 'phone']])
               ). \
    withColumn('adr_ph_map', func.map_from_entries('adr_ph_struct')). \
    groupBy('id'). \
    pivot('collection'). \
    agg(func.first('adr_ph_map')). \
    show(truncate=False)

# +---+------------------------------+------------------------------+------------------------------+------------------------------+
# |id |primary_attributes_a          |primary_attributes_b          |secondary_attributes_a        |secondary_attributes_b        |
# +---+------------------------------+------------------------------+------------------------------+------------------------------+
# |1  |{address -> abc, phone -> 111}|{address -> def, phone -> 222}|{address -> ghi, phone -> 333}|{address -> jkl, phone -> 444}|
# |2  |{address -> mno, phone -> 555}|{address -> stu, phone -> 777}|{address -> pqr, phone -> 666}|{address -> vwy, phone -> 888}|
# +---+------------------------------+------------------------------+------------------------------+------------------------------+

The data before the groupBy().pivot() would look like this

# +---+----+---------+-------+-----+----------------------+------------------------------+------------------------------+
# |id |type|order    |address|phone|collection            |adr_ph_struct                 |adr_ph_map                    |
# +---+----+---------+-------+-----+----------------------+------------------------------+------------------------------+
# |1  |A   |Primary  |abc    |111  |primary_attributes_a  |[{address, abc}, {phone, 111}]|{address -> abc, phone -> 111}|
# |1  |B   |Primary  |def    |222  |primary_attributes_b  |[{address, def}, {phone, 222}]|{address -> def, phone -> 222}|
# |1  |A   |Secondary|ghi    |333  |secondary_attributes_a|[{address, ghi}, {phone, 333}]|{address -> ghi, phone -> 333}|
# |1  |B   |Secondary|jkl    |444  |secondary_attributes_b|[{address, jkl}, {phone, 444}]|{address -> jkl, phone -> 444}|
# |2  |A   |Primary  |mno    |555  |primary_attributes_a  |[{address, mno}, {phone, 555}]|{address -> mno, phone -> 555}|
# |2  |A   |Secondary|pqr    |666  |secondary_attributes_a|[{address, pqr}, {phone, 666}]|{address -> pqr, phone -> 666}|
# |2  |B   |Primary  |stu    |777  |primary_attributes_b  |[{address, stu}, {phone, 777}]|{address -> stu, phone -> 777}|
# |2  |B   |Secondary|vwy    |888  |secondary_attributes_b|[{address, vwy}, {phone, 888}]|{address -> vwy, phone -> 888}|
# +---+----+---------+-------+-----+----------------------+------------------------------+------------------------------+

2 Comments

Thanks for your response. Can you please help in updating this scenario for multiple values for a single key. like this. {address -> abc,def phone -> 111,222}
@pradeepnadarajan you can first group by key columns and use concat_ws(',', collect_list()). it'll be similar to one of your previous question where I amswered.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.