1

I am new to Pyspark and not yet familiar with all the functions and capabilities it has to offer.

I have a PySpark Dataframe with a column which contains nested JSON values, for example:

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

rows = [['Alice', """{
                      "level1":{
                         "tag1":{
                            "key1":"value1",
                            "key2":"value2",
                            "key3":"value3",
                         }
                      },
                      "level2":{
                         "tag1":{
                            "key1":"value1",
                         }
                      },
                      "level3":{
                         "tag1":{
                            "key1":"value1",
                            "key2":"value2",
                            "key3":"value3",
                         },
                         "tag2":{
                            "key1":'value1'
                      }
                   }}"""
         ]]
columns = ['name', 'Levels']
df = spark.createDataFrame(rows, columns)

The number of levels, tags, and key:value pairs in each tag are not in my control and may change.

My goal is to create a new Dataframe from the original with a new row for each tuple (level, tag, key, value) with the corresponding columns. Therefore, from the row with in the example, there will be new 8 rows in the form of:
(name, level, tag, key, value)
Alice, level1, tag1, key1, value1
Alice, level1, tag1, key2, value2
Alice, level1, tag1, key3, value3

Alice, level2, tag1, key1, value1

Alice, level3, tag1, key1, value1
Alice, level3, tag1, key2, value2
Alice, level3, tag1, key3, value3
Alice, level3, tag2, key1, value1

1 Answer 1

1

As first step the Json is transformed into an array of (level, tag, key, value)-tuples using an udf. The second step is to explode the array to get the individual rows:

from pyspark.sql import functions as F
from pyspark.sql import types as T


df = ...

def to_array(lvl):
    def to_tuple(lvl):
        levels=lvl.asDict()
        for l in levels:
            level=l
            tags = levels[l].asDict()
            for t in tags:
                keys = tags[t].asDict()
                for k in keys:
                    v=keys[k]
                    yield (l, t, k, v)
    return list(to_tuple(lvl))

outputschema=T.ArrayType(T.StructType([
    T.StructField("level", T.StringType(), True),
    T.StructField("tag", T.StringType(), True),
    T.StructField("key", T.StringType(), True),
    T.StructField("value", T.StringType(), True)
]))

to_array_udf = F.udf(to_array, outputschema)

df.withColumn("tmp", to_array_udf("Levels")) \
    .withColumn("tmp", F.explode("tmp")) \
    .select("Levels", "tmp.*") \
    .show()

Output:

+--------------------+------+----+----+------+
|              Levels| level| tag| key| value|
+--------------------+------+----+----+------+
|{{{value1, value2...|level1|tag1|key1|value1|
|{{{value1, value2...|level1|tag1|key2|value2|
|{{{value1, value2...|level1|tag1|key3|value3|
|{{{value1, value2...|level2|tag1|key1|value1|
|{{{value1, value2...|level3|tag1|key1|value1|
|{{{value1, value2...|level3|tag1|key2|value2|
|{{{value1, value2...|level3|tag1|key3|value3|
|{{{value1, value2...|level3|tag2|key1|value1|
+--------------------+------+----+----+------+
Sign up to request clarification or add additional context in comments.

2 Comments

Thank you for your response, it really helped me. I have a few questions regarding what happens behind the scenes: 1) I see you defined the udf with an output schema, but used withColumn to create a new single column "tmp", so how does that work? 2) What does select on "tmp.*" do?
1) in Spark a single column can contain a complex data structure, and that is what happens here. The udf returns one array of structs per input row and this array is stored in a single field called tmp with the structure defined in outputschema. 2.) select tmp.* selects all elements within the structure of tmp, eg tmp.level, ' tmp.tag, tmp.key` and tmp.value. - To get a better grip on these concepts, try to print out df.printSchema() after each step in the pipleline, e.g. after each withColumn.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.