2

This is a followup question to: Trigger when State expires

I'm storing state of each incoming element in the stream and after the timer goes off, I remove the state. This is so that I can prevent duplicates from being processed until the element has timed out after which I can process the same element again. I

I've written the following code to test timers but it seems that the timer is triggered after all 3 elements have gone through the first ProcessFunction.

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    streamEnv.setParallelism(12);

    List<Tuple2<String, String>> inputList = new ArrayList<>();
    inputList.add(new Tuple2<>("Test", "test"));
    inputList.add(new Tuple2<>("Test", "test"));
    inputList.add(new Tuple2<>("Test", "test"));

    streamEnv.fromCollection(inputList).keyBy(0)
            .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                ValueState<Integer> occur;

                @Override
                public void open(Configuration parameters) throws Exception {
                    occur = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("occurs", Integer.class, 0));
                }

                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    if (occur.value() < 2) {
                        occur.update(occur.value() + 1);
                        out.collect(value);
                        LOGGER.info("[TEST] Outputting Tuple {}", value);
                    }
                    else {
                        Thread.sleep(10000);
                        LOGGER.info("[TEST] Outputting Tuple {}", value);
                        out.collect(value);
                    }
                }
            })
            .keyBy(0)
            .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                ValueState<Tuple2<String, String>> storedTuple;

                @Override
                public void open(Configuration parameters) throws Exception {
                    storedTuple = getRuntimeContext().getState(new ValueStateDescriptor<>("storedTuple",
                            TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})));
                }

                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    Tuple2<String, String> stored = storedTuple.value();
                    if (stored == null) {
                        LOGGER.info("[TEST] Storing Tuple {}", value);
                        storedTuple.update(value);
                        out.collect(value);
                        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 6000);
                    }
                }
            }

            @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    LOGGER.info("[TEST] Removing Tuple {}", storedTuple.value());
                    storedTuple.clear();
                }    
            )
            .addSink(new CollectSink());

    streamEnv.execute("Testing");
    for (Tuple2<String, String> tup: CollectSink.values) {
        System.out.println(tup);
    }

}

private static class CollectSink implements SinkFunction<Tuple2<String, String>> {

    static final List<Tuple2<String, String>> values = new ArrayList<>();

    @Override
    public synchronized void invoke(Tuple2<String, String> value) throws Exception {
        values.add(value);
    }
}

I have an input list with 3 duplicate elements. In the first ProcessFunction I send the first two elements as is but delay the 3rd element by 10 seconds.

In the second ProcessFunction it filters the element based on whether the state is stored for it or not. As expected, the first element gets stored and sent onwards and the second element isn't as the state already exists. For the first element, apart from sending it on, I also set a timer for 6 seconds so that the state is cleared after the timer is triggered.

Now the third element is sent on after 10 seconds which means that the 6 second trigger should have already cleared the state. However, the third element is also being processed before the timer is triggered. I can also see the output as containing only 1 copy of the Tuple even though I'm expected 2 copies.

I've added some logging to give a better idea of the execution times.

[2019-02-19 14:11:48,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:48,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:48,943] [Process -> Sink: Unnamed (1/12)] INFO  FlinkTest - [TEST] Storing Tuple (Test,test)
[2019-02-19 14:11:58,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:58,896] [Process -> Sink: Unnamed (1/12)] INFO  FlinkTest - [TEST] Removing Tuple (Test,test)

You can see that the first two tuples are emitted together as expected followed by a 10 second delay after which the 3rd tuple is emitted. Now the Removing Tuple occurs after the 10 seconds even though it was triggered to occur after 6 seconds of the first tuple coming in.

2 Answers 2

7

The event-time timer won't fire until a Watermark greater than the time specified in the timer is processed. Such a watermark can't occur until after the third event has been processed. Furthermore, with ingestion time, watermarks are generated using a periodic watermark generator, and by default are inserted into the stream every 200 msec.

Sign up to request clarification or add additional context in comments.

9 Comments

You are using an event time timer. That requires watermarks. Because you are using ingestion time, the watermarking is being done for you, but there are still watermarks.
Ok so in the case where it never receives a water mark, it triggers it by itself when the stream is closed?
When a finite source reaches its end, a watermark whose value is MAX_WATERMARK is created and sent through the job graph, which fires all event time timers.
You could use event time timers, but might need to either write your own watermark generator, or implement a custom source and have it produce watermarks that meet your needs. Or use processing time timers.
I just tried ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 6000); and now the job behaves as expected. I'm not entirely sure what's going on with the event time timer and watermarks.
|
0

NOTE: Before Flink 1.4.0, when called from a processing-time timer, the ProcessFunction.onTimer() method sets the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it’s harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic depends on this wrong timestamp highly likely is unintendedly faulty. So we’ve decided to fix it. Upon upgrading to 1.4.0, Flink jobs that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.