0

Summary

We are using Flink 1.15.1 and have long-running stateful Flink jobs ingesting data from Kafka topics. They are configured to write checkpoints, with a RocksDB backend on S3.

We have noticed that sometimes the S3 "folders" for some checkpoints were getting very large and increasing continuously. They are also larger than what Flink reports in the UI. When cancelling the associated Flink jobs, some files are deleted but not all of them.

This seems to happen especially when Flink struggles to complete the checkpoints in the first time.

Experiment

I tried to reproduce the problem by submitting a job with a very small checkpointing timeout and a continuous input data:

  • Initially, the checkpoints were successful, new chk-<checkpoint id> folders were created, and regularly replaced the previous one. So, at one point in time, I had one chk-<id> folder with a _metadata file and two other files. Those two other files had sizes that kept increasing as the job consumed more and more data.
  • At some point, the checkpoints started to time out, as expected. As I kept the job running like this for a while, I noticed that some older chk-<id> folders were not removed, even though there were more recent chk-<id> folders. Those folders were not empty. However, the only _metadata file I found does not reference them, while it does reference other checkpoint files from the same folder. So they look like uncollected garbage.
  • I stopped the data source. The job started writing checkpoints successfully again, but the old uncollected files were still there.
  • I removed some of the files that I thought were garbage, and the job kept running without complaining.
  • I cencelled the job. The chk-<id> folder that seemed valid was deleted. But the remaining chk-<id> folders that seemed problematic were indeed still present.

Any idea what could be the problem here? How to force Flink to remove files that are not part of its checkpoints anymore?

6
  • When the checkpoints timed out, did the job fail, or was it configured to tolerate the timeouts and continue running? Commented Aug 8, 2023 at 0:26
  • I think the job restarted. Commented Aug 8, 2023 at 6:48
  • Old pending checkpoints will be cancelled if checkpoint it not completed in configured timeout period. The job should keep running if checkpoint is timeout. Commented Aug 8, 2023 at 10:16
  • Checkpoint files(for completed checkpoints and aborted checkpoints) are not guaranteed to be deleted by Flink job. You can check logs from flink master by searching keywords like: Executing discard procedure for OR Could not properly dispose the private states in the pending checkpoint Commented Aug 8, 2023 at 10:20
  • 1
    The Flink community is aware of that issue and there's FLIP-270 covering it. The discussion on the problem hasn't proceeded, yet, because we still have to run some experiments before coming up with a proper solution here. Commented Aug 14, 2023 at 10:23

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.