0

I need to transform a list of JSONs into pySpark DataFrames. The JSON all have the same Schema. The problem is that the value-entries of the dicts in the JSON have different data types.

Example: The field complex is an Array of Dicts, the Dicts has four keys but of different types (Integer, String, Float and a nested Dict). See below for an example JSON.

If I use df = spark.createDataFrame(json_list) to create my DataFrame from the jsons, pyspark "deletes" some of the data as he cannot infer the Schema correctly. PySpark decides that the Schema of the complex-field should be: StructType("complex", ArrayType(MapType(StringType(), LongType()))) which leads to the non-LongType values being nulled.

I tried to supply a schema, but since I need to set a specific (?) DataType for the value fields of the nested MapType - which is not uniform, but varies...

myschema = StructType([
                             StructField("Id", StringType(), True),
                             StructField("name", StringType(), True),
                             StructField("sentTimestamp", LongType(), True),
                             StructType("complex", ArrayType(MapType(StringType(), StringType())))
                             ])

The MapType(StringType(), StringType()))) means some value-fields in the dict are being nulled as it cannot be mapped.

It seems that PySpark can only handle dicts if all data types of the values are the same.

How can I convert the JSON to a pyspark DataFrame without loosing data?

[{
    "Id": "2345123",
    "name": "something",        
    "sentTimestamp": 1646732402,
    "complex":
    [
        {
            "key1": 1,
            "key2": "(1)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "random",
                "innerkey2": 5.4,
                "innerkey3": 1
            }
        },
        {
            "key1": 2,
            "key2": "(2)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "left",
                "innerkey2": 7.8,
                "innerkey3": 1
            }
        }
    ]
}]

3 Answers 3

1

You can specify the schema of the complex column as an array of struct.

myschema = StructType(
    [
        StructField("Id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("sentTimestamp", LongType(), True),
        StructField(
            "complex",
            ArrayType(StructType(
                [
                    StructField("key1", LongType(), True),
                    StructField("key2", StringType(), True),
                    StructField("key3", StringType(), True),
                    StructField(
                        "key4",
                        StructType(
                            [
                                StructField("innerkey1", StringType(), True),
                                StructField("innerkey2", StringType(), True),
                                StructField("innerkey3", IntegerType(), True),
                            ]
                        )
                    )
                ]
            ))
        )
    ]
)
Sign up to request clarification or add additional context in comments.

1 Comment

So simple, so elegant. Amazing. Thank you!
1

If you do not want to pass a schema or want spark to detect schema from 3.0+ you can write json into a table

%sql

CREATE TABLE newtable AS SELECT
'{
    "Id": "2345123",
    "name": "something",        
    "sentTimestamp": 1646732402,
    "complex":
    [
        {
            "key1": 1,
            "key2": "(1)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "random",
                "innerkey2": 5.4,
                "innerkey3": 1
            }
        },
        {
            "key1": 2,
            "key2": "(2)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "left",
                "innerkey2": 7.8,
                "innerkey3": 1
            }
        }
    ]
}'as original

Convert the table into a dataframe

df1 =spark.sql('select * from newtable')

rdd the single column in the table

rdd=df1.select(col("original").alias("jsoncol")).rdd.map(lambda x: x.jsoncol)

Leverage .read to read the rdd schema and set is avariable

newschema=spark.read.json(rdd).schema

Assign schema to column using select

df3=df1.select("*",from_json("original", newschema).alias("transrequest"))

df3.select('transrequest.*').show(truncate=False)

+-------+----------------------------------------------------------------+---------+-------------+
|Id     |complex                                                         |name     |sentTimestamp|
+-------+----------------------------------------------------------------+---------+-------------+
|2345123|[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|something|1646732402   |
+-------+----------------------------------------------------------------+---------+-------------+

schema

root
 |-- Id: string (nullable = true)
 |-- complex: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: long (nullable = true)
 |    |    |-- key2: string (nullable = true)
 |    |    |-- key3: double (nullable = true)
 |    |    |-- key4: struct (nullable = true)
 |    |    |    |-- innerkey1: string (nullable = true)
 |    |    |    |-- innerkey2: double (nullable = true)
 |    |    |    |-- innerkey3: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sentTimestamp: long (nullable = true)

Comments

0

Adding to @过过招 's answer, below is the approach I would personally use since it involves lesser code while defining the dataframe schema.

Input JSON

jsonstr = """[{
    "Id": "2345123",
    "name": "something",        
    "sentTimestamp": 1646732402,
    "complex":
    [
        {
            "key1": 1,
            "key2": "(1)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "random",
                "innerkey2": 5.4,
                "innerkey3": 1
            }
        },
        {
            "key1": 2,
            "key2": "(2)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "left",
                "innerkey2": 7.8,
                "innerkey3": 1
            }
        }
    ]
}]"""

Converting this to a RDD -

import json

rdd = sc.parallelize(json.loads(jsonstr))

Creating the dataframe -

df=spark.createDataFrame(rdd, 'Id string, name string, sentTimestamp long, complex array<struct<key1:int, key2:string, key3:float, key4:struct<innerkey1:string,innerkey2:float,innerkey3:int>>>')
df.show(truncate=False)

#Output Data
+-------+---------+-------------+----------------------------------------------------------------+
|Id     |name     |sentTimestamp|complex                                                         |
+-------+---------+-------------+----------------------------------------------------------------+
|2345123|something|1646732402   |[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|
+-------+---------+-------------+----------------------------------------------------------------+

#Output Schema
root
 |-- Id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sentTimestamp: long (nullable = true)
 |-- complex: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: integer (nullable = true)
 |    |    |-- key2: string (nullable = true)
 |    |    |-- key3: float (nullable = true)
 |    |    |-- key4: struct (nullable = true)
 |    |    |    |-- innerkey1: string (nullable = true)
 |    |    |    |-- innerkey2: float (nullable = true)
 |    |    |    |-- innerkey3: integer (nullable = true)

2 Comments

You will have to define the Schema in the String though which is a lot more errorprone (and harder to debug) than defining the Schema outright with StructType/StructField classes.
@Cribber Depends on personal preference and ease of understanding.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.