I have a PySpark DataFrame schema where the quantity field is specified as IntegerType. However, when the JSON data contains a string representation of a number (e.g., "30"), the record is moved to corrupt_records.
from pyspark.sql.functions import from_json, col, when
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
data = [
("{'fruit':'Apple', 'quantity':10}",),
("{'fruit':'Banana', 'quantity':20}",),
("{'fruit':'Cherry', 'quantity':'30'}",),
("{'fruit':'Date', 'quantity':'40'}",),
("{'fruit':'Elderberry', 'quantity':'50'}",)
]
# Define schema
schema = StructType([
StructField("json_column", StringType(), True)
])
# Create DataFrame
input_df = spark.createDataFrame(data, schema)
input_df.show(truncate=False)
+---------------------------------------+
|json_column |
+---------------------------------------+
|{'fruit':'Apple', 'quantity':10} |
|{'fruit':'Banana', 'quantity':20} |
|{'fruit':'Cherry', 'quantity':'30'} |
|{'fruit':'Date', 'quantity':'40'} |
|{'fruit':'Elderberry', 'quantity':'50'}|
+---------------------------------------+
json_options = {"columnNameOfCorruptRecord":"corrupt_json"}
json_schema = StructType([StructField('fruit', StringType(), True),
StructField('quantity', IntegerType(), True),
StructField('corrupt_json', StringType(), True)]
)
json_df = input_df.select(from_json("json_column", schema = json_schema, options=json_options).alias("data"))
df = json_df.select("data.*")
df.show(truncate = False)
+----------+--------+---------------------------------------+
|fruit |quantity|corrupt_json |
+----------+--------+---------------------------------------+
|Apple |10 |NULL |
|Banana |20 |NULL |
|Cherry |NULL |{'fruit':'Cherry', 'quantity':'30'} |
|Date |NULL |{'fruit':'Date', 'quantity':'40'} |
|Elderberry|NULL |{'fruit':'Elderberry', 'quantity':'50'}|
+----------+--------+---------------------------------------+
Is there a way to make the from_json function implicitly cast string numbers to integers, so that records with string numbers are not moved to corrupt_records?
json_schemaand find allIntegerType()fields and run pandas.to_numeric with theerrors='coerce'flag on those fields ininput_df? That way you can automatically transform all values likequantityto integers.