0

I have a PySpark DataFrame schema where the quantity field is specified as IntegerType. However, when the JSON data contains a string representation of a number (e.g., "30"), the record is moved to corrupt_records.

from pyspark.sql.functions import from_json, col, when
from pyspark.sql.types import StringType, StructType, StructField, IntegerType

data = [
    ("{'fruit':'Apple', 'quantity':10}",),
    ("{'fruit':'Banana', 'quantity':20}",),
    ("{'fruit':'Cherry', 'quantity':'30'}",),
    ("{'fruit':'Date', 'quantity':'40'}",),
    ("{'fruit':'Elderberry', 'quantity':'50'}",)
]

# Define schema
schema = StructType([
    StructField("json_column", StringType(), True)
])

# Create DataFrame
input_df = spark.createDataFrame(data, schema)
input_df.show(truncate=False)
+---------------------------------------+
|json_column                            |
+---------------------------------------+
|{'fruit':'Apple', 'quantity':10}       |
|{'fruit':'Banana', 'quantity':20}      |
|{'fruit':'Cherry', 'quantity':'30'}    |
|{'fruit':'Date', 'quantity':'40'}      |
|{'fruit':'Elderberry', 'quantity':'50'}|
+---------------------------------------+
json_options = {"columnNameOfCorruptRecord":"corrupt_json"}
json_schema =  StructType([StructField('fruit', StringType(), True), 
                           StructField('quantity', IntegerType(), True), 
                           StructField('corrupt_json', StringType(), True)]
                          )

json_df = input_df.select(from_json("json_column", schema = json_schema, options=json_options).alias("data"))
df = json_df.select("data.*")
df.show(truncate = False)

+----------+--------+---------------------------------------+
|fruit     |quantity|corrupt_json                           |
+----------+--------+---------------------------------------+
|Apple     |10      |NULL                                   |
|Banana    |20      |NULL                                   |
|Cherry    |NULL    |{'fruit':'Cherry', 'quantity':'30'}    |
|Date      |NULL    |{'fruit':'Date', 'quantity':'40'}      |
|Elderberry|NULL    |{'fruit':'Elderberry', 'quantity':'50'}|
+----------+--------+---------------------------------------+

Is there a way to make the from_json function implicitly cast string numbers to integers, so that records with string numbers are not moved to corrupt_records?

1
  • Could you potentially go through json_schema and find all IntegerType() fields and run pandas.to_numeric with the errors='coerce' flag on those fields in input_df? That way you can automatically transform all values like quantity to integers. Commented Dec 11, 2024 at 14:21

1 Answer 1

1

As you can't specify multiple data type in a same field, one way to achieve your goal is to cast those fields to string type first and convert to integer type later:

json_options = {"columnNameOfCorruptRecord":"corrupt_json"}
json_schema =  StructType([StructField('fruit', StringType(), True), 
                           StructField('quantity', StringType(), True), 
                           StructField('corrupt_json', StringType(), True)]
                          )

json_df = input_df.select(from_json("json_column", schema = json_schema, options=json_options).alias("data"))
df = json_df.select(
    "data.*"
)
df = df.withColumn("quantity", col("quantity").cast(IntegerType()))
df.show(truncate = False)

+----------+--------+------------+
|fruit     |quantity|corrupt_json|
+----------+--------+------------+
|Apple     |10      |NULL        |
|Banana    |20      |NULL        |
|Cherry    |30      |NULL        |
|Date      |40      |NULL        |
|Elderberry|50      |NULL        |
+----------+--------+------------+
Sign up to request clarification or add additional context in comments.

4 Comments

Thank you, @Jonathan, for your response. However, I would like to implement this check during the from_json process. In the future, many columns will require similar checks, and I’m concerned that performing casts later on for numerous columns might increase the code’s execution time.
@DumbCoder Could you explain why kind of checking do you want to apply in those columns? The reason why the last 3 rows have a corrupt JSON is because of different data type mentioned in the JSON schema. If you want to apply this checking in different fields, then why do you want those records with string numbers not to move to corrupt_records. Could you explain more about your objective?
there is indeed a type mismatch. However, what I was looking for is implicit type casting, similar to how databases handle it. For example, in sql if I create a column as an Integer and pass a number in string format, the value is accepted.
@DumbCoder AFAIK there is no implicit type casting (i.e. pass different argument when you use from_json) when you want to parse a JSON string. You could only 1) update the JSON string before using from_json, or 2) update the from_json source code by yourself, or 3) do the type casting explicitly

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.