1

I am implementing a AggregateFunction to measure the duration between two events after .window(EventTimeSessionWindows.withGap(gap)) . After the second event is processed, the window is closed.

Will flink automatically checkpoint the state of the AggregateFunction so that existing data in the accumulator is not lost from restarting?

Since I am not sure about that. I tried to implement AggregatingState in a RichAggregateFunction: class MyAgg extends RichAggregateFunction<IN, ACC, OUT>

AggregatingState requires AggregatingStateDescriptor. Its constructor has this signature:

            String name,
            AggregateFunction<IN, ACC, OUT> aggFunction,
            Class<ACC> stateType) {

I am very confused by the aggFunction. What should be put here? Isn't it the MyAgg that I am trying to define in the first place?

2
  • In what context are you using this AggregateFunction? Is this on a streaming window, or somewhere else? Commented Jul 30, 2020 at 21:14
  • @davidanderson it is a streaming window. Commented Jul 30, 2020 at 21:43

1 Answer 1

2

An AggregateFunction doesn't have any state. But the aggregating state used in a streaming window (and manipulated by an AggregateFunction) is checkpointed as part of the window's state.

A RichAggregateFunction cannot be used in a window context, and an AggregateFunction cannot have its own state. It's designed this way because if an AggregateFunction were allowed to use a state descriptor to define ValueState, for example, then that state wouldn't be mergeable -- and to keep the Window API reasonably clean, all window state needs to be mergeable (for the sake of session windows).

AggregatingState is something you might use in a KeyedProcessFunction, for example. In that context, you need to define how elements are to be aggregated into the accumulator (i.e., the AggregatingState), which you do with an AggregateFunction.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.