13

I asked the question a while back for python, but now I need to do the same thing in PySpark.

I have a dataframe (df) like so:

|cust_id|address    |store_id|email        |sales_channel|category|
-------------------------------------------------------------------
|1234567|123 Main St|10SjtT  |[email protected]|ecom         |direct  |
|4567345|345 Main St|10SjtT  |[email protected]|instore      |direct  |
|1569457|876 Main St|51FstT  |[email protected]|ecom         |direct  |

and I would like to combine the last 4 fields into one metadata field that is a json like so:

|cust_id|address    |metadata                                                                                     |
-------------------------------------------------------------------------------------------------------------------
|1234567|123 Main St|{'store_id':'10SjtT', 'email':'[email protected]','sales_channel':'ecom', 'category':'direct'}   |
|4567345|345 Main St|{'store_id':'10SjtT', 'email':'[email protected]','sales_channel':'instore', 'category':'direct'}|
|1569457|876 Main St|{'store_id':'51FstT', 'email':'[email protected]','sales_channel':'ecom', 'category':'direct'}   |

Here's the code I used to do this in python:

cols = [
    'store_id',
    'store_category',
    'sales_channel',
    'email'
]

df1 = df.copy()
df1['metadata'] = df1[cols].to_dict(orient='records')
df1 = df1.drop(columns=cols)

but I would like to translate this to PySpark code to work with a spark dataframe; I do NOT want to use pandas in Spark.

2 Answers 2

34

Use to_json function to create json object!

Example:

from pyspark.sql.functions import *

#sample data
df=spark.createDataFrame([('1234567','123 Main St','10SjtT','[email protected]','ecom','direct')],['cust_id','address','store_id','email','sales_channel','category'])

df.select("cust_id","address",to_json(struct("store_id","category","sales_channel","email")).alias("metadata")).show(10,False)

#result
+-------+-----------+----------------------------------------------------------------------------------------+
|cust_id|address    |metadata                                                                                |
+-------+-----------+----------------------------------------------------------------------------------------+
|1234567|123 Main St|{"store_id":"10SjtT","category":"direct","sales_channel":"ecom","email":"[email protected]"}|
+-------+-----------+----------------------------------------------------------------------------------------+

to_json by passing list of columns:

ll=['store_id','email','sales_channel','category']

df.withColumn("metadata", to_json(struct([x for x in ll]))).drop(*ll).show()

#result
+-------+-----------+----------------------------------------------------------------------------------------+
|cust_id|address    |metadata                                                                                |
+-------+-----------+----------------------------------------------------------------------------------------+
|1234567|123 Main St|{"store_id":"10SjtT","email":"[email protected]","sales_channel":"ecom","category":"direct"}|
+-------+-----------+----------------------------------------------------------------------------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

is it possible to pass in lists for the columns instead of their actual column names?
Best answer ever! I'm going from Kafka -> Spark -> Kafka and this answer is exactly what I needed.
3

@Shu gives a good answer, here's a variant that works out slightly better for my use case. I'm going from Kafka -> Spark -> Kafka and this one liner does exactly what I want. The struct(*) will pack up all the fields in the dataframe.

# Packup the fields in preparation for sending to Kafka sink
kafka_df = df.selectExpr('cast(id as string) as key', 'to_json(struct(*)) as value')

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.