1

We are using Apache Flink with Kafka and the AT_LEAST_ONCE strategy. The following problem occurs: when stopping a job, Flink commits offsets for messages that have not yet been processed, resulting in data loss. The problem is reproduced only under high load on the service. If you stop the job and restart it from the last successful checkpoint, the offsets are used correctly and no data loss occurs.

The problem is observed on Flink versions: 1.16.2, 1.16.3, 1.17.1, 1.18.1, but the bug could not be reproduced on version 1.19.1. Checkpoints are aligned (But unaligned checkpoints give the same result). All checkpoints terminate with COMPLETED status, and there is no difference between stopping the job and canceling it.

How fix that on flink 1.16?

PS. Additional question Why the same thing happens with the EXACTLY_ONCE strategy, even if the statements are artificially slowed down. The checkpoint is completed before the transaction is closed. Most checkpoints are marked as COMPLETED, although Flink does not commit in kafka.(Flink try commit every 3 or 4 checkpoints). But when the offset is COMPLETED it should commit the offset? It feels like the checkpoint finishes before the barrier reaches the end?

3
  • > Flink commits offsets for messages that have not yet been processed, resulting in data loss. How have you determined that this is data loss, given that Flink doesn't rely on committed offsets for its fault tolerance, but stores offsets in its checkpoints and savepoints? Commented Feb 12 at 15:00
  • We are talking about restarting without a savepoint, when the flink will take an offset for the group. And these offsets are further than, actually processed messages. If you start the service after that, the number of skips will be equal to the offset error. Commented Feb 12 at 15:10
  • For the tests, we fixed the offset's of the input messages in the output messages Commented Feb 12 at 15:17

1 Answer 1

2

Flink relies on its checkpointing and savepointing feature for resilience and recovery. It does not work with the Kafka offsets or consumer groups for resilience. The offset commits during checkpoints are only for observability purposes, as documented on https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-offset-committing

If you don't recover from a checkpoint or savepoint, you will get undeterministic results with regards to data recovery.

Sign up to request clarification or add additional context in comments.

3 Comments

So there is no way to restart the application with a predictable result when different versions are not compatible? (no way to restore from a savepoint).
Correct. You would be ignoring the internal state of Flink previously, so how could the results be correct?
Got it, thanks. I just thought that when completing the task, the service will commit only worked changes, and it will be possible to restart without data loss, albeit with duplicates.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.