2

I am running a data ingestion ETL pipeline orchestrated by Airflow using PySpark to read data from MongoDB (using the MongoDB Spark Connector) and load it into a Delta Lake table. The pipeline is failing during the read phase for a specific collection.

Problem The job fails with a com.mongodb.spark.sql.connector.exceptions.DataException and a SparkException indicating a schema conflict for a field named personalDocument. The error suggests that the connector is expecting a StructType but is encountering a BsonArray in the source data.

Environment Details Orchestrator: Apache Airflow

Engine: PySpark

Source: MongoDB

Target: Delta Lake

Key Log Snippets The logs show the schema before conversion and the final error:

Schema Identification and Conversion Step: The pipeline identifies the field as a Struct and attempts a conversion to a string format, which suggests handling of complex or variant data:

INFO - - Column: personalDocument, Type: StructType([StructField('birthCertificate', StringType(), True), StructField('businessLicence', NullType(), True), StructField('lastChanges', NullType(), True), StructField('nationalCard', StringType(), True)]) [cite: 288]
INFO - ⚙️ Converting 'personalDocument' (StructType) → YAML StringType [cite: 289]
INFO - ✅ Final schema after YAML conversion:
INFO - root
...
|-- personalDocument: string (nullable = true)
...

Fatal Error: The task fails because the MongoDB Connector hits data that does not match the expected StructType (even if it's immediately converted to a string afterwards, the initial read fails due to the mismatch):

WARN TaskSetManager: Lost task 0.0 in stage 47.0 (TID 434) (... executor 0): com.mongodb.spark.sql.connector.exceptions.DataException: Invalid field: 'personalDocument'. [cite: 307]
The dataType 'struct' is invalid for 'BsonArray{values=[{"documentId": "1", "path": "..."}, {"documentId": "2", "path": "..."}, {"documentId": "3", "path": "abc"}, {"documentId": "4", "path": "abc"}]}'. [cite: 307]
...
ERROR - ETL failed for table serviceregisters: An error occurred while calling o323.showString. [cite: 312]
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 47.0 failed 4 times... com.mongodb.spark.sql.connector.exceptions.DataException: Invalid field: 'personalDocument'. [cite: 313]

Question It appears the personalDocument field has an inconsistent schema in MongoDB, sometimes being a single BSON Document (Struct) and other times being a BSON Array of Documents (BsonArray).

How can I configure the PySpark MongoDB read operation to gracefully handle this schema variation, specifically when a field expected to be a StructType is an array of sub-documents in some records?

Ideally, I need a solution that:

Allows the MongoDB connector to read this field without failing.

If possible, forces the connector to read it as a String or BSON/Variant Type initially, so my subsequent logic can handle the structure conversion (StructType or ArrayType) before casting to the final YAML string.

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.