4

I'm loading many versions of JSON files to spark DataFrame. some of the files holds columns A,B and some A,B,C or A,C..

If I run this command

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

df = sqlContext.sql("SELECT A,B,C FROM table")

after loading several I can get error "column not exist" I loaded only files that are not holding column C. How can set this value to null instead of getting error?

0

1 Answer 1

2

DataFrameReader.json method provides optional schema argument you can use here. If your schema is complex the simplest solution is to reuse one inferred from the file which contains all the fields:

df_complete = spark.read.json("complete_file")
schema = df_complete.schema

df_with_missing = spark.read.json("df_with_missing", schema)
# or
# spark.read.schema(schema).("df_with_missing")

If you know schema but for some reason you cannot use above you have to create it from scratch.

schema = StructType([
    StructField("A", LongType(), True), ..., StructField("C", LongType(), True)])

As always be sure to perform some quality checks after loading your data.

Example (note that all fields are nullable):

from pyspark.sql.types import *

schema = StructType([
    StructField("x1", FloatType()),
    StructField("x2", StructType([
        StructField("y1", DoubleType()),
        StructField("y2", StructType([
            StructField("z1", StringType()),
            StructField("z2", StringType())
        ]))
    ])),
    StructField("x3", StringType()),
    StructField("x4", IntegerType())
])

spark.read.json(sc.parallelize(["""{"x4": 1}"""]), schema).printSchema()
## root
##  |-- x1: float (nullable = true)
##  |-- x2: struct (nullable = true)
##  |    |-- y1: double (nullable = true)
##  |    |-- y2: struct (nullable = true)
##  |    |    |-- z1: string (nullable = true)
##  |    |    |-- z2: string (nullable = true)
##  |-- x3: string (nullable = true)
##  |-- x4: integer (nullable = true)

spark.read.json(sc.parallelize(["""{"x4": 1}"""]), schema).first()
## Row(x1=None, x2=None, x3=None, x4=1)

spark.read.json(sc.parallelize(["""{"x3": "foo", "x1": 1.0}"""]), schema).first()
## Row(x1=1.0, x2=None, x3='foo', x4=None)

spark.read.json(sc.parallelize(["""{"x2": {"y2": {"z2": "bar"}}}"""]), schema).first()
## Row(x1=None, x2=Row(y1=None, y2=Row(z1=None, z2='bar')), x3=None, x4=None)

Important:

This method is applicable only to JSON source and depend on the detail of implementation. Don't use it for sources like Parquet.

Sign up to request clarification or add additional context in comments.

6 Comments

read.json() seems to only take 1 argument. this worked for me: df_with_missing = sqlContext.read.schema(schema).json("df_with_missing")
I am not sure this solution works. You can pretty much print the schema or show it without a problem, but when you try to do anything with those columns (like check if they are not null), then it fails complaining that the [column] is not in the schema (tested with Spark 2.0.2).
@marios How do you check this? Do you use JSON input?
@zero323 good question, I didn't realize this answer was JSON specific. I am using parquet. I wrote another question on this if you are kind enough to take a look :): stackoverflow.com/questions/46107245/…
@zero323 the trick I used so far is: spark.createDataFrame(myNewDf.rdd, schema), this works but looks pretty ugly (and the .rdd doesn't seem to come for free).
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.