1

Trying to do the following scala code but in pyspark:

val maxJsonParts = 3 // whatever that number is...
val jsonElements = (0 until maxJsonParts)
                     .map(i => get_json_object($"Payment", s"$$[$i]"))

val newDF = dataframe
  .withColumn("Payment", explode(array(jsonElements: _*)))
  .where(!isnull($"Payment"))

For example, I am trying to make a nested column such as in the payment column below:

id name payment
1 James [ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]

to become:

id name payment
1 James {"@id": 1, "currency":"GBP"}
1 James {"@id":2, "currency":"USD"}

The table schema:

root
|-- id: integer (nullable = true)
|-- Name: string (nullable = true)   
|-- Payment: string (nullable = true)

I tried writing this in Pyspark but its just turning the nested column (payment) to null:

lst = [range(0,10)]
jsonElem = [F.get_json_object(F.col("payment"), f"$[{i}]") for i in lst]
bronzeDF = bronzeDF.withColumn("payment2", F.explode(F.array(*jsonElem)))
bronzeDF.show()

Any help is highly appreciated.

1
  • 1
    The pyspark's interface is very similar to scala's. Try rewriting it yourself and come here if you have any specific issues with it. "Write this but in Python for me" is not a question for SO. Commented Mar 2, 2021 at 11:13

2 Answers 2

1

Here is another approach which allows you to parse the given JSON based on the right schema in order to generate the payment array. The solution is based on from_json function which allows you to parse a string JSON into struct type.

from pyspark.sql.types import IntegerType, StringType, ArrayType, StructField
from pyspark.sql.functions import from_json, explode

data = [
  (1, 'James', '[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]'), 
  (2, 'Tonny', '[ {"@id": 3, "currency":"EUR"},{"@id": 4, "currency": "USD"} ]'), 
]
df = spark.createDataFrame(data, ['id', 'name', 'payment'])

str_schema = 'array<struct<`@id`:int,`currency`:string>>'

# st_schema = ArrayType(StructType([
#                 StructField('@id', IntegerType()),
#                 StructField('currency', StringType())]))

df = df.withColumn("payment", explode(from_json(df["payment"], str_schema)))

df.show()

# +---+-----+--------+
# | id| name| payment|
# +---+-----+--------+
# |  1|James|[1, GBP]|
# |  1|James|[2, USD]|
# |  2|Tonny|[3, EUR]|
# |  2|Tonny|[4, USD]|
# +---+-----+--------+

Note: as you can see you can choose between the string representation of the schema or ArrayType. Both should produce the same results.

Sign up to request clarification or add additional context in comments.

1 Comment

I realized that explode literally does the same thing :'). I remember i spent hours on trying to figure out the pyspark code equivalent and succeeding, while the fact that explode function does the same thing went hugely unnoticed
0

I came to the solution:

first convert the column to a string type as follows:

bronzeDF = bronzeDF.withColumn("payment2", F.to_json("payment")).drop("payment")

Then you can perform the following code on the column to stack the n nested json objects as separate rows with the same outer key values:

max_json_parts = 50
lst = [f for f in range(0, max_json_parts, 1)]
jsonElem = [F.get_json_object(F.col("payment2"), f"$[{i}]") for i in lst]
bronzeDF = bronzeDF.withColumn("payment2", F.explode(F.array(*jsonElem))).where(F.col("payment2").isNotNull())

Then convert back to struct with a defined schema and and explode the object keys as separate columns:

bronzeDF = bronzeDF.withColumn("temp", F.from_json("payment2", jsonSchemaPayment)).select("*", "temp.*").drop("payment2")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.