I am implementing a AggregateFunction to measure the duration between two events after .window(EventTimeSessionWindows.withGap(gap))
. After the second event is processed, the window is closed.
Will flink automatically checkpoint the state of the AggregateFunction so that existing data in the accumulator is not lost from restarting?
Since I am not sure about that. I tried to implement AggregatingState in a RichAggregateFunction:
class MyAgg extends RichAggregateFunction<IN, ACC, OUT>
AggregatingState requires AggregatingStateDescriptor. Its constructor has this signature:
String name,
AggregateFunction<IN, ACC, OUT> aggFunction,
Class<ACC> stateType) {
I am very confused by the aggFunction. What should be put here? Isn't it the MyAgg that I am trying to define in the first place?