1

I'm working on a Flink application where I'm using an aggregate function over a window. I've been able to successfully read the state of its outputs using the provided APIs.

However, in my specific use case, I also need to be able to stop the processing, retrieve and modify the state, and then resume processing without resetting checkpoints. Unfortunately, the documentation doesn't provide any examples for such use cases.

This is how the aggregator and window function is used:

eventStream
    .keyBy(_.key)
    .window(TumblingTimeWindow.of(5.minutes))
    .aggregate(new Aggregator, new PostAggregatorWindowFunction)
    .uid("aggregate-every-5-minutes")

Then I run separately:

val readerStream = SavepointReader.read(env, checkpointFile, backend)
    .window(TumblingTimeWindow.of(5.minutes))
    .aggregate(
    "aggregate-every-5-minutes",
    new Aggregator,
    new WindowStateReader // Extends WindowReaderFunction and outputs (KEY, CurrentAggregate)
  )

val transformation = OperatorTransformation
    .bootstrapWith(readerStream)
    .keyBy(_._1) // Keying by first entry of tuple (KEY, CurrentAggregate)
    .window(TumblingTimeWindow.of(5.minutes))
    .aggregate(???, ???) // How do we define these? And how do we set their state?

SavepointWriter.fromExistingSavepoint(env, checkpointFile, backend)
    .removeOperator(OperatorIdentifier.forUid("aggregate-every-5-minutes"))
    .addOperator(OperatorIdentifier.forUid("aggregate-every-5-minutes", transformation))
    .write(outputFile)

Could someone please guide me on how I can achieve this in Flink 1.17? Any code snippets or pointers to relevant resources would be greatly appreciated. Thank you!

3
  • Can you provide context for why you think you need to interrupt processing? Commented Sep 22, 2023 at 17:34
  • @kkrugler I added some code snippets and did some rewordings that I think will better explain what I have in mind. Commented Sep 28, 2023 at 9:19
  • Using the State Processor APIs is normally for solving a one-time problem, but it sounds like you plan on making this part of your regular workflow...why? Commented Oct 1, 2023 at 14:36

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.