We are using Apache Flink with Kafka and the AT_LEAST_ONCE strategy. The following problem occurs: when stopping a job, Flink commits offsets for messages that have not yet been processed, resulting in data loss. The problem is reproduced only under high load on the service. If you stop the job and restart it from the last successful checkpoint, the offsets are used correctly and no data loss occurs.
The problem is observed on Flink versions: 1.16.2, 1.16.3, 1.17.1, 1.18.1, but the bug could not be reproduced on version 1.19.1. Checkpoints are aligned (But unaligned checkpoints give the same result). All checkpoints terminate with COMPLETED status, and there is no difference between stopping the job and canceling it.
How fix that on flink 1.16?
PS. Additional question Why the same thing happens with the EXACTLY_ONCE strategy, even if the statements are artificially slowed down. The checkpoint is completed before the transaction is closed. Most checkpoints are marked as COMPLETED, although Flink does not commit in kafka.(Flink try commit every 3 or 4 checkpoints). But when the offset is COMPLETED it should commit the offset? It feels like the checkpoint finishes before the barrier reaches the end?