14

As shown in the below code, I am reading a JSON file into a dataframe and then selecting some fields from that dataframe into another one.

df_record = spark.read.json("path/to/file.JSON",multiLine=True)

df_basicInfo = df_record.select(col("key1").alias("ID"), \
                                col("key2").alias("Status"), \
                                col("key3.ResponseType").alias("ResponseType"), \
                                col("key3.someIndicator").alias("SomeIndicator") \
                                )

Issue is that some times, the JSON file does not have some of the keys that I try to fetch - like ResponseType. So it ends up throwing errors like:

org.apache.spark.sql.AnalysisException: No such struct field ResponseType

How can I get around this issue without forcing a schema at the time of read? is it possible to make it return a NULL under that column when it is not available?

how do I detect if a spark dataframe has a column Does mention how to detect if a column is available in a dataframe. This question, however, is about how to use that function.

1

6 Answers 6

14

Using has_column function define here by zero323 and general guidelines about adding empty columns either

from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *

if has_column(df_record, "key3.ResponseType"):
    df_basicInfo = df_record.withColumn("ResponseType", col("key3.ResponseType"))
else:
    # Adjust types according to your needs
    df_basicInfo = df_record.withColumn("ResponseType", lit(None).cast("string")) 

Adjust types according to your requirements, and repeat process for the remaining columns.

Alternatively define a schema that covers all desired types:

schema = StructType([
    StructField("key1", StringType()),
    StructField("key2", StringType()),
    StructField("key2", StructType([
        StructField("ResponseType", StringType()),
        StructField("someIndicator", StringType()),
    ]))
])

df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)

(once again adjust the types), and use your current code.

Sign up to request clarification or add additional context in comments.

4 Comments

Is there any SparkSQL version?
case when otherwise is failing if there is no column.
I tried this but it always evaluates col("key3.ResponseType") and if the column doesn't exists, it will throw an error.
The second option requires the column to exist in order to evaluate when. So as @Hello.World said this throws an error if the column does not exist.
2

Spark is missing a simple function: struct_has(STRUCT, PATH) or struct_get(STRUCT, PATH, DEFAULT) where PATHuse dot notation.

So I wrote a very simple UDF:

From https://gist.github.com/ebuildy/3c9b2663d47f7b65fbc12cfb469ae19c:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row

spark.udf.register("struct_def", (root:GenericRowWithSchema, path: String, defaultValue: String) => {

    var fields = path.split("\\.")
    var buffer:Row = root
    val lastItem = fields.last

    fields = fields.dropRight(1)

    fields.foreach( (field:String) => {
        if (buffer != null) {
            if (buffer.schema.fieldNames.contains(field)) {
                buffer = buffer.getStruct(buffer.fieldIndex(field))
            } else {
                buffer = null
            }
        }
    })

    if (buffer == null) {
        defaultValue
    } else {
        buffer.getString(buffer.fieldIndex(lastItem))
    }
})

This let you query like this:

SELECT struct_get(MY_COL, "foo.bar", "no") FROM DATA

Comments

2

I had the same issue, i used a similar approach as Thomas. My user defined function code:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row

spark.udf.register("tryGet", (root:GenericRowWithSchema, fieldName: String) => {
    var buffer:Row = root

    if (buffer != null) {
      if (buffer.schema.fieldNames.contains(fieldName)) {
         buffer.getString(buffer.fieldIndex(fieldName))
      } else {
        null
      }
    }
    else {
      null
    }
})

and then my Query:

%sql

SELECT
  Id,
  Created,
  Payload.Type,
  tryGet(Payload, "Error") as Error,
FROM dataWithJson
WHERE Payload.Type = 'Action'

1 Comment

i tried and getting org.apache.spark.SparkException: Failed to execute user defined function(DataFrameConverter$$$Lambda$2744/0x000000080192ef48: (string, string) => string)
2

So I tried using the accepted answer, however I found that if the column key3.ResponseType doesn't exist, it will fail.

You can do something like this -

def hasColumn(df: DataFrame, path: String) = 
  if (Try(df(path)).isSuccess == true) {
      df(path)
  }
  else {
      lit(null) 
  }

Here you evaluate in function if column exists, and if it doesn't it just returns a NULL column.

You can now use this like -

df_basicInfo = df_record.withColumn("ResponseType", hasColumn(df_record, "key3.ResponseType"))

Comments

0

I saw many confusing answers, so I hope this helps in Pyspark, here is how you do it! Create a function to check on the columns and keep checking each column to see if it exists, if not replace it with None or a relevant datatype value.

from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import lit, col, when


def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False

Now, as mentioned in the question

df_record = spark.read.json("path/to/file.JSON",multiLine=True)
df_new = df_record

if has_column(df_new, "data.col1"):
    df_new = df_new.withColumn("col_1", col("data.col1"))
else:
    df_new = df_new.withColumn("col_1", lit(None).cast("string")) 


if has_column(df_new, "log.id"):
    df_new = df_loader.withColumn("log_id", col("log.id").cast("bigint"))
else:
    df_new = df_new.withColumn("log_id", lit(None).cast("bigint")) 

.....

and so on, you make relevant changes to the dataframe till you finally see all the fields you want to populate in df_new. Hope this helps !

Comments

0

Another way to do this is casting your struct type column to string and using the get_json_object function. It can handle unknown fields. eg:

from pyspark.sql.functions import get_json_object
df_record.select(get_json_object(df_record.key3.cast("string"),'$. ResponseType').alias('ResponseType'))

The output will always be string. The string 'null' will be the column value where the required field is not found

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.