1

I need to count events within a tumbling window. But I also want to send events with 0 Value if there were no events within the window.

Something like.

  1. windowCount: 5
  2. windowCount: 0
  3. windowCount: 0
  4. windowCount: 3
  5. windowCount: 0 ...
import com.google.protobuf.Message;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.skydivin4ng3l.cepmodemon.models.events.aggregate.AggregateOuterClass;

public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(T event, Long accumulator) {
        return accumulator + 1L;
    }

    @Override
    public AggregateOuterClass.Aggregate getResult(Long accumulator) {
        return AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
    }

    @Override
    public Long merge(Long accumulator1, Long accumulator2) {
        return accumulator1 + accumulator2;
    }
}

and used here

DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new BasicCounter<MonitorOuterClass.Monitor>());

TimeCharacteristics are ingestionTime

I read about a TiggerFunction which might detect if the aggregated Stream has received an event after x time but i am not sure if that is the right way to do it.

I expected the aggregation to happen even is there would be no events at all within the window. Maybe there is a setting i am not aware of?

Thx for any hints.

0

2 Answers 2

1

I chose Option 1 as suggested by @David-Anderson:

Here is my Event Generator:

public class EmptyEventSource implements SourceFunction<MonitorOuterClass.Monitor> {

    private volatile boolean isRunning = true;

    private final long delayPerRecordMillis;

    public EmptyEventSource(long delayPerRecordMillis){
        this.delayPerRecordMillis = delayPerRecordMillis;
    }

    @Override
    public void run(SourceContext<MonitorOuterClass.Monitor> sourceContext) throws Exception {
        while (isRunning) {
            sourceContext.collect(MonitorOuterClass.Monitor.newBuilder().build());

            if (delayPerRecordMillis > 0) {
                Thread.sleep(delayPerRecordMillis);
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

and my adjusted AggregateFunction:

public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(T event, Long accumulator) {
        if(((MonitorOuterClass.Monitor)event).equals(MonitorOuterClass.Monitor.newBuilder().build())) {
            return accumulator;
        }

        return accumulator + 1L;
    }

    @Override
    public AggregateOuterClass.Aggregate getResult(Long accumulator) {
        AggregateOuterClass.Aggregate newAggregate = AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
        return newAggregate;
    }

    @Override
    public Long merge(Long accumulator1, Long accumulator2) {
        return accumulator1 + accumulator2;
    }
}

Used them Like this:

DataStream<MonitorOuterClass.Monitor> someEntryStream = env.addSource(currentConsumer);
DataStream<MonitorOuterClass.Monitor> triggerStream = env.addSource(new EmptyEventSource(delayPerRecordMillis));
DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
                        .union(triggerStream)
                        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                        .aggregate(new BasicCounter<MonitorOuterClass.Monitor>());
Sign up to request clarification or add additional context in comments.

Comments

0

Flink's windows are created lazily, when the first event is assigned to a window. Thus empty windows do not exist, and can't produce results.

In general there are three ways to workaround this issue:

  1. Put something in front of the window that adds events to the stream, ensuring that every window has something in it, and then modify your window processing to ignore these special events when computing their results.
  2. Use a GlobalWindow along with a custom Trigger that uses processing time timers to trigger the window (with no events flowing, the watermark won't advance, and event time timers won't fire until more events arrive).
  3. Don't use the window API, and implement your own windowing with a ProcessFunction instead. But here you'll still face the issue of needing to use processing time timers.

Update:

Having now made an effort to implement an example of option 2, I cannot recommend it. The issue is that even with a custom Trigger, the ProcessAllWindowFunction will not be called if the window is empty, so it is necessary to always keep at least one element in the GlobalWindow. This appears then to require implementing a rather hacky Evictor and ProcessAllWindowFunction that collaborate to retain and ignore a special element in the window -- and you also have to somehow get that element into the window in the first place.

If you're going to do something hacky, option 1 appears to be much simpler.

3 Comments

Choosing Option 2. I do not understand how to set up the Trigger correctly. DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream.windowAll(GlobalWindows.create() /*TumblingEventTimeWindows.of(Time.seconds(5))*/).trigger(new PurgingTrigger<>(ProcessingTimeTrigger.create())).aggregate(new BasicCounter<MonitorOuterClass.Monitor>()); I am a little bit confused how to use this, could you provide an example?
I spent an ungodly amount of time trying to write an example that I could feel good about sharing, and gave up. See my updated answer.
I am doing Option1 now =) cause i also gave up on 2. And it seams to be working but now I have a Protobuf issue where 0 can not be a value somehow, instead the object is empty. But at least it is working =)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.