1

I am using Pyspark to transform JSON in a Dataframe. And I am successfully able to transform it. But the problem I am facing is there is a key which will be present in some JSON file and will not be present in another. When I flatten the JSON with Pyspark SQL context and the key is not present in some JSON file, it gives error in creating my Pyspark data frame, throwing SQL Analysis Exception.

for example my sample JSON

{
    "_id" : ObjectId("5eba227a0bce34b401e7899a"),
    "origin" : "inbound",
    "converse" : "72412952",
    "Start" : "2020-04-20T06:12:20.89Z",
    "End" : "2020-04-20T06:12:53.919Z",
    "ConversationMos" : 4.88228940963745,
    "ConversationRFactor" : 92.4383773803711,
    "participantId" : "bbe4de4c-7b3e-49f1-8",
}

The above JSON participant id will be available in some JSON and not in another JSON files

My pysaprk code snippet:

fetchFile = sark.read.format(file_type)\
                .option("inferSchema", "true")\
                .option("header","true")\
                .load(generated_FileLocation)

fetch file.registerTempTable("CreateDataFrame")
tempData = sqlContext.sql("select origin,converse,start,end,participantId from CreateDataFrame")

When, in some JSON file participantId is not present, an exception is coming. How to handle that kind of exception that if the key is not present so column will contain null or any other ways to handle it

2
  • 1
    Why not programmatically check if the schema contains the column and add it if it's not present? Commented May 14, 2020 at 5:10
  • May be stackoverflow.com/questions/32166812/… can help. Commented May 14, 2020 at 5:19

2 Answers 2

1

You can simply check if the column is not there then add it will empty values. The code for the same goes like:

from pyspark.sql import functions as f
fetchFile = sark.read.format(file_type)\
                .option("inferSchema", "true")\
                .option("header","true")\
                .load(generated_FileLocation)


if not 'participantId' in df.columns:
   df = df.withColumn('participantId', f.lit(''))

fetch file.registerTempTable("CreateDataFrame")
tempData = sqlContext.sql("select origin,converse,start,end,participantId from CreateDataFrame")
Sign up to request clarification or add additional context in comments.

2 Comments

thanks, shubham Jain, let me try this. This looks clean
Accept as answer if it helps you...:)
0

I think you're calling Spark to read one file at a time and inferring the schema at the same time.

What Spark is telling you with the SQL Analysis exception is that your file and your inferred schema doesn't have the key you're looking for. What you have to do is get to your good schema and apply it to all of the files you want to process. Ideally, processing all of your files at once.

There are three strategies:

  1. Infer your schema from lots of files. You should get the aggregate of all of the keys. Spark will run two passes over the data.
df = spark.read.json('/path/to/your/directory/full/of/json/files')
schema = df.schema
print(schema)
  1. Create a schema object I find this tedious to do, but will speed up your code. Here is a reference: https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.types.StructType

  2. Read the schema from a well formed file then use that to read your whole directory. Also, by printing the schema object, you can copy paste that back into your code for option #2.

schema = spark.read.json('path/to/well/formed/file.json')
print(schema)
my_df = spark.read.schema(schema).json('path/to/entire/folder/full/of/json')

2 Comments

Thanks for reply Douglas but I am calling one by one because scenarios demand it considers it we are doing it for different users and afte completing one user then we are going to other users, and some user have paarticipant key some not because of use case and if I query participant in files which doesn't have the key that I want to handle
That's case #2. Create a schema object schema, and then run my_df = spark.read.schema(schema).json('path/to/entire/folder/specific_file.json')

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.