I need to count events within a tumbling window. But I also want to send events with 0 Value if there were no events within the window.
Something like.
- windowCount: 5
- windowCount: 0
- windowCount: 0
- windowCount: 3
- windowCount: 0 ...
import com.google.protobuf.Message;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.skydivin4ng3l.cepmodemon.models.events.aggregate.AggregateOuterClass;
public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(T event, Long accumulator) {
return accumulator + 1L;
}
@Override
public AggregateOuterClass.Aggregate getResult(Long accumulator) {
return AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
}
@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}
and used here
DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new BasicCounter<MonitorOuterClass.Monitor>());
TimeCharacteristics are ingestionTime
I read about a TiggerFunction which might detect if the aggregated Stream has received an event after x time but i am not sure if that is the right way to do it.
I expected the aggregation to happen even is there would be no events at all within the window. Maybe there is a setting i am not aware of?
Thx for any hints.