I have a file where each row is a stringified JSON. I want to read it into a Spark DataFrame, along with schema validation.
The naive approach would be:
val schema: StructType = getSchemaFromSomewhere()
val df: DataFrame = spark.read
.option("mode", "DROPMALFORMED")
.format("json")
.schema(schema)
.load("path/to/data.json")
However, this approach performs only some very basic schema validations.
- If a row is not parsable as json - it will be dropped.
- If a row contains a property with a value that can't be cast to the type defined by the
schema- it will be dropped. - BUT - this loading method ignores non-nullable fields (making them nullable in the resulting DF), and does not allow to fill in default values.
Approach 2 - use JsonSchema
In order to do that I can't use spark.read.json() anymore because I need the data to be in JsonNode format.
So instead I read it as a text file and parse it using the JsonSchema library:
def getJsonSchemaFactory: JsonSchemaFactory = JsonSchemaFactory.byDefault
def stringToJsonSchema(str: String): Try[JsonSchema] = {
stringToJson(str).map(getJsonSchemaFactory.getJsonSchema(_))
}
def stringToJson(str: String): Try[JsonNode] = {
val mapper = new ObjectMapper
Try({
val json = mapper.readTree(str)
json
})
}
def validateJson(data: JsonNode): Boolean = {
jsonSchema.exists(jsonSchema => {
val report = jsonSchema.validateUnchecked(data, true)
report.isSuccess
})
}
lazy val jsonSchema: Option[JsonSchema] = stringToJsonSchema(schemaSource).toOption
val schema: StructType = getSchemaFromSomewhere()
val df = spark.read
.textFile("path/to/data.json")
.filter(str => {
stringToJson(str)
.map(validateJson)
.getOrElse(false)
})
.select(from_json($"value", schema) as "jsonized")
.select("jsonized.*")
The problem now is that I am parsing each string line into json twice - once inside the filter, and another time in the select(from_json ...).
What I am looking for
Some way to read JSON data from a file to a DataFrame while also applying a JsonSchema validation on all the data - invalid data should be dropped (and maybe also logged somewhere).
- Is there a way to convert
Dataset[JsonNode]to aDataFramewithout parsing it more than once? - Is there a way to convert a DF
RowintoJsonNodeobject? That way I could flip the order - first read the DF usingspark.read.json()and then filter the DF by converting eachRowtoJsonNodeand applying theJsonSchema. - Is there some other way that I am missing here?
Thanks