I am running a data ingestion ETL pipeline orchestrated by Airflow using PySpark to read data from MongoDB (using the MongoDB Spark Connector) and load it into a Delta Lake table. The pipeline is failing during the read phase for a specific collection.
Problem
The job fails with a com.mongodb.spark.sql.connector.exceptions.DataException and a SparkException indicating a schema conflict for a field named personalDocument. The error suggests that the connector is expecting a StructType but is encountering a BsonArray in the source data.
Environment Details Orchestrator: Apache Airflow
Engine: PySpark
Source: MongoDB
Target: Delta Lake
Key Log Snippets The logs show the schema before conversion and the final error:
Schema Identification and Conversion Step: The pipeline identifies the field as a Struct and attempts a conversion to a string format, which suggests handling of complex or variant data:
INFO - - Column: personalDocument, Type: StructType([StructField('birthCertificate', StringType(), True), StructField('businessLicence', NullType(), True), StructField('lastChanges', NullType(), True), StructField('nationalCard', StringType(), True)]) [cite: 288]
INFO - ⚙️ Converting 'personalDocument' (StructType) → YAML StringType [cite: 289]
INFO - ✅ Final schema after YAML conversion:
INFO - root
...
|-- personalDocument: string (nullable = true)
...
Fatal Error: The task fails because the MongoDB Connector hits data that does not match the expected StructType (even if it's immediately converted to a string afterwards, the initial read fails due to the mismatch):
WARN TaskSetManager: Lost task 0.0 in stage 47.0 (TID 434) (... executor 0): com.mongodb.spark.sql.connector.exceptions.DataException: Invalid field: 'personalDocument'. [cite: 307]
The dataType 'struct' is invalid for 'BsonArray{values=[{"documentId": "1", "path": "..."}, {"documentId": "2", "path": "..."}, {"documentId": "3", "path": "abc"}, {"documentId": "4", "path": "abc"}]}'. [cite: 307]
...
ERROR - ETL failed for table serviceregisters: An error occurred while calling o323.showString. [cite: 312]
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 47.0 failed 4 times... com.mongodb.spark.sql.connector.exceptions.DataException: Invalid field: 'personalDocument'. [cite: 313]
Question
It appears the personalDocument field has an inconsistent schema in MongoDB, sometimes being a single BSON Document (Struct) and other times being a BSON Array of Documents (BsonArray).
How can I configure the PySpark MongoDB read operation to gracefully handle this schema variation, specifically when a field expected to be a StructType is an array of sub-documents in some records?
Ideally, I need a solution that:
Allows the MongoDB connector to read this field without failing.
If possible, forces the connector to read it as a String or BSON/Variant Type initially, so my subsequent logic can handle the structure conversion (StructType or ArrayType) before casting to the final YAML string.