65

I have a pyspark dataframe consisting of one column, called json, where each row is a unicode string of json. I'd like to parse each row and return a new dataframe where each row is the parsed json.

# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])

I've tried mapping over each row with json.loads:

(df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
  .toDF()
).show()

But this returns a TypeError: expected string or buffer

I suspect that part of the problem is that when converting from a dataframe to an rdd, the schema information is lost, so I've also tried manually entering in the schema info:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

But I get the same TypeError.

Looking at this answer, it looks like flattening out the rows with flatMap might be useful here, but I'm not having success with that either:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .flatMap(lambda x: x)
  .flatMap(lambda x: json.loads(x))
  .map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

I get this error: AttributeError: 'unicode' object has no attribute 'get'.

8 Answers 8

123

For Spark 2.1+, you can use from_json which allows the preservation of the other non-json columns within the dataframe as follows:

from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))

You let Spark derive the schema of the json string column. Then the df.json column is no longer a StringType, but the correctly decoded json structure, i.e., nested StrucType and all the other columns of df are preserved as-is.

You can access the json content as follows:

df.select(col('json.header').alias('header'))
Sign up to request clarification or add additional context in comments.

10 Comments

When I try it with streaming data frame (structured streaming), I get an error that Queries with streaming sources must be executed with writeStream.start();;\nkafka. Can you please help me how I can use the JSON data from kafka streaming.
Just use a regular dataframe/rdd to extract the json schema from a batch/sample of data. Then, use the extracted schema in your streaming app.
Hi, can you tell me what is col in your code? is it the 'json' column object?
It's a Spark function which you can import see spark.apache.org/docs/2.4.0/api/python/…
just a 'gotcha' as it gave me some headache: in the part row.json, the 'json' is referring to the column named 'json', so if your column is 'my_json' then is going to be row.my_json (unhappy choice of column names)
|
52

Converting a dataframe with json strings to structured dataframe is'a actually quite simple in spark if you convert the dataframe to RDD of strings before (see: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)

For example:

>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
root
 |-- body: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- sub_json: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- sub_sub_json: struct (nullable = true)
 |    |    |    |-- col1: long (nullable = true)
 |    |    |    |-- col2: string (nullable = true)
 |-- header: struct (nullable = true)
 |    |-- foo: string (nullable = true)
 |    |-- id: long (nullable = true)

5 Comments

This is great - Thanks! Is there a way to convert the structtypes to maptypes? Later in my code, I'm parsing out each maptype by explodeing the columns.
Ah I think I've figured it out: I can avoid using maptypes by doing something like this: body = new_df.select('body').rdd.map(lambda r: r.body).toDF()
Actally it's much simpler: just type new_df.select('body') and you will have dataframe with body objects only.
Cool!, is there a way to join the new data frame with the original (which has other fields besides the json string)
@OphirYoktan Unfortunately not. For this I recommend from_json described in the Martin's answer here.
28

Existing answers do not work if your JSON is anything but perfectly/traditionally formatted. For example, the RDD-based schema inference expects JSON in curly-braces {} and will provide an incorrect schema (resulting in null values) if, for example, your data looks like:

[
  {
    "a": 1.0,
    "b": 1
  },
  {
    "a": 0.0,
    "b": 2
  }
]

I wrote a function to work around this issue by sanitizing JSON such that it lives in another JSON object:

def parseJSONCols(df, *cols, sanitize=True):
    """Auto infer the schema of a json column and parse into a struct.

    rdd-based schema inference works if you have well-formatted JSON,
    like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
    string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
    can fix everything by wrapping the data in another JSON object
    (``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
    automatically performs the wrapping and unwrapping.

    The schema inference is based on this
    `SO Post <https://stackoverflow.com/a/45880574)/>`_.

    Parameters
    ----------
    df : pyspark dataframe
        Dataframe containing the JSON cols.
    *cols : string(s)
        Names of the columns containing JSON.
    sanitize : boolean
        Flag indicating whether you'd like to sanitize your records
        by wrapping and unwrapping them in another JSON object layer.

    Returns
    -------
    pyspark dataframe
        A dataframe with the decoded columns.
    """
    res = df
    for i in cols:

        # sanitize if requested.
        if sanitize:
            res = (
                res.withColumn(
                    i,
                    psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
                )
            )
        # infer schema and apply it
        schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
        res = res.withColumn(i, psf.from_json(psf.col(i), schema))

        # unpack the wrapped object if needed
        if sanitize:
            res = res.withColumn(i, psf.col(i).data)
    return res

Note: psf = pyspark.sql.functions.

2 Comments

> For example, the RDD-based schema inference expects JSON in curly-braces where did you read this? awesome find!
" where did you read this? ". I can't say i read it anywhere, I simply found that pyspark did not parse my JSON unless this was true.
6

Here's a concise (spark SQL) version of @nolan-conaway's parseJSONCols function.

SELECT 
explode(
    from_json(
        concat('{"data":', 
               '[{"a": 1.0,"b": 1},{"a": 0.0,"b": 2}]', 
               '}'), 
        'data array<struct<a:DOUBLE, b:INT>>'
    ).data) as data;

PS. I've added the explode function as well :P

You'll need to know some HIVE SQL types

Comments

1
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

def map2json(dict):
    import json
    return json.dumps(dict)
from pyspark.sql.types import StringType
spark.udf.register("map2json", lambda dict: map2json(dict), StringType())

spark.sql("select map2json(map('a', '1'))").show()

Comments

1

If you don't know the schema of each JSON (and it can be different) you can use :

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
 
# ... here you get your DF

# Assuming the first column of your DF is the JSON to parse
my_df = spark.read.json(my_df.rdd.map(lambda x: x[0]))

Note that it won't keep any other column present in your dataset. From : https://github.com/apache/spark/pull/22775

Comments

1

This answer is for added context if your JSON strings are JSON Arrays instead of objects (I can't comment since I don't have rep). If you use Martin Tapp's solid answer it will return null values for your columns.

tl;dr

If your JSON strings are array objects like so:

[{"a":1, "b":1.0}]

spark.read.json will return a dataframe that contains the schema of the elements in those arrays and not the include the array itself. from_json isn't happy with this, so to be as specific as it wants you can wrap the schema inferred by spark.read.json in an ArrayType and it will properly parse (instead of returning null values for everything).

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType

array_item_schema = \
  spark.read.json(df.rdd.map(lambda row: row['json_string_column'])).schema

json_array_schema = ArrayType(array_item_schema, True)

arrays_df = df.select(F.from_json('json_string_column', json_array_schema).alias('json_arrays'))

objects_df = arrays_df.select(F.explode('json_arrays').alias('objects'))

Intro

As an addendum to Nolan Conaway's, it seems that when your JSON is of the form

[
  {
    "a": 1.0,
    "b": 1
  },
  {
    "a": 0.0,
    "b": 2
  }
]

where the top level object is an array (and not an object), pyspark's spark.read.json() treats the array as a collection of objects to be converted into rows instead of a single row.

See example run in PySpark 3.3.0 shell:

>>> myjson        = """[{"a": 1.0,"b": 1},{"a": 2.0,"b": 2}]"""
>>> myotherjson   = """[{"a": 3.0,"b": 3}]"""
>>> rawobjectjson = """{"a": 4.0,"b": 4}"""
>>> spark_read_df = spark.read.json(sc.parallelize([myjson,myotherjson,rawobjectjson]))
>>> spark_read_df.show()
+---+---+
|  a|  b|
+---+---+
|1.0|  1|
|2.0|  2|
|3.0|  3|
|4.0|  4|
+---+---+

>>> spark_read_df.printSchema()
root
 |-- a: double (nullable = true)
 |-- b: long (nullable = true)

We can see that myjson and myotherjson which were JSON arrays of JSON objects got expanded to have a row for each object they contained. It also smoothly handled when one of the JSON strings rawobjectjson is just a raw object. I think the documentation falls a little short here, as I couldn't find mention of this handling for array objects.

Now let's create a dataframe with a column of JSON strings. Going to drop the rawobjectjson because as we'll see from_json requires each string to have the same schema (and this includes the top level array if present).

>>> from pyspark.sql.types import StructType, StructField, StringType, ArrayType
>>> json_string_data = [
...     (myjson,),
...     (myotherjson,),
... ]
>>> json_df_schema = StructType([
...     StructField('json_strings', StringType(), True),
... ])
>>> raw_json_df = spark.createDataFrame(data=json_string_data, schema=json_df_schema)
>>> raw_json_df.show()
+--------------------+
|        json_strings|
+--------------------+
|[{"a": 1.0,"b": 1...|
| [{"a": 3.0,"b": 3}]|
+--------------------+

Now here's where I tried to use the schema inferred by spark.read.json to pass to from_json to read the JSON column to objects, but it kept returning columns that were fully null. As Nolan Conaway mentioned this will happen when the schema passed to from_json couldn't be applied to the given strings.

The issue is that in these strings it sees the top level as an array, but as spark_read_df.printSchema() shows, the schema inferred by spark.read.json() ignores the array level.

The Solution

So the solution I ended up going with was just accounting for the top level array in the schema when doing the read.

from pyspark.sql import functions as F

# This one won't work for directly passing to from_json as it ignores top-level arrays in json strings
# (if any)!
# json_object_schema = spark_read_df.schema()

# from_json is a bit more "simple", it directly applies the schema to the string. In this case
# the top level type is actually an array, so a simple fix is to just wrap the schema that
# spark.read.json returned in an ArrayType to match the true JSON string
json_array_schema = ArrayType(spark_read_df.schema, True)

json_extracted_df = raw_json_df.select(
    F.from_json('json_strings', json_array_schema)
        .alias('json_arrays')
)
>>> json_extracted_df.show()
+--------------------+
|         json_arrays|
+--------------------+
|[{1.0, 1}, {2.0, 2}]|
|          [{3.0, 3}]|
+--------------------+

>>> json_extracted_df.printSchema()
root
 |-- json_arrays: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: double (nullable = true)
 |    |    |-- b: long (nullable = true)

From there the objects can be pulled out of the array using pyspark.sql.functions.explode:

>>> exploded_df = json_extracted_df.select(F.explode('json_arrays').alias('objects'))
>>> exploded_df.show()
+--------+
| objects|
+--------+
|{1.0, 1}|
|{2.0, 2}|
|{3.0, 3}|
+--------+

>>> exploded_df.printSchema()
root
 |-- objects: struct (nullable = true)
 |    |-- a: double (nullable = true)
 |    |-- b: long (nullable = true)

1 Comment

I have a use case in which I dont know if the column is an array or not, any solution for that?
0

To dynamic parse a json string column to json object by inferring the schema dynamically from all the records of that column in your dataframe you can use this below generic function that i had developed.

So my usecase was to parse this json string to object to load into redshift SUPER column so it can be further be queried.

This func takes List of columns having valid json string values and null safe param. So loops through this columns and for each json column it creates an rdd of all the records and stores into a df and infers schema through that. Than later using from_json you can convert this json string to json object.

from typing import List, Tuple
from pyspark.sql import DataFrame
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType

from utils import get_spark_session

def json_string_column_to_json_object(df: DataFrame, **kwargs) -> DataFrame:
    """
    Converts a column having valid json string to json object to be loaded in redshift as super column

    Args:
        df (DataFrame): Input DataFrame.
        json_columns (List[str]): List of json column names that need to be converted from json string to json object by dynamically inferring its schema to store in SUPER column.
        null_safe (Boolean): Boolean value to ignore rows having null json values to optimise performance

    Returns:
        DataFrame: DataFrame with split columns.
    """
    input_dict = kwargs.get("func_args", {})
    json_columns = input_dict.get("json_columns", [])
    null_safe = input_dict.get("null_safe", True)  # null_safe -> to filter custom null values () while inferring json schema to optimise performance , Default to True for null-safe filtering

    spark = get_spark_session() ##create your sparksession

    for column in json_columns:
        json_col = column

        # Step 2: Extract JSON strings and convert to RDD
        if null_safe:
            json_rdd = df.select(json_col).filter(
                f.col(json_col).isNotNull() & (f.col(json_col) != "[]") & (f.col(json_col) != "NULL") &(f.col(json_col) != "null") & (f.col(json_col) != "")).rdd.map(lambda row: row[0])
        else:
            json_rdd = df.select(json_col).filter(
                f.col(json_col).isNotNull()).rdd.map(
                lambda row: row[0])

        # Step 3: Read JSON strings as a new DataFrame to infer full schema
        json_df = spark.read.json(json_rdd)

        inferred_schema = json_df.schema
        inferred_array_schema = ArrayType(inferred_schema)

        # Step 4: Parse original column using the full inferred schema
        df = df.withColumn(json_col, f.from_json(f.col(json_col), inferred_array_schema))



    return df

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.