I want to replicate a collection and sync in real time.
The CDC events are streamed to Kafka and I’ll be listening to it and based on operationType (insert/delete/update) I’ll have to process the document and load it in the delta table. I have all the possible columns in my target table in case of schema change in fullDocument.
I am working with PySpark in Databricks and I have tried couple of different approaches -
using
foreachBatch,clusterTimefor ordering the events but this requires me to do acollect()and then process event by event, this is obviously very slow.Using SCD kind of approach where Instead of deleting any record or updating existing document I was marking them inactive - This does not give you a proper history tracking because for an
_idI am taking the latest change and processing it, the issue I am facing with this that I have been told by the source team that I can get an insert event for an_idafter a delete event of the same_idso, for a batch if the following events come in that order for a document - “update → delete, → insert” then based on latest change I’ll pick the insert and this will result in a duplicate having active status for both records.There is one another approach I found out was to insert everything as I receive and then while querying one should filter out the latest record based on timestamp, but I'm not sure if this is the best that can be done.
What will be the best way to solve this?