0

I am writing some code for a processElement function in Apache Flink 1.4:

public class ProcessFunctionClass extends ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>{

    private ListState<String> listState;

    public void processElement(Tuple2<String, String> tuple2,  Context context, Collector<Tuple2<String, String>> collector) {

        // if the state is empty, start a timer
        if (listState.get().iterator().hasNext() == false)
            context.timerService().registerEventTimeTimer(10000);

        listState.add("someStringToBeStored");

        // ...
    }

}

I have this function for when the timer expires:

public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
    Iterable<String> strings = listState.get();
    int cnt = 0;
    int totalLength = 0;
    Iterator<String> it = strings.iterator();
    while (it.hasNext()) {
        cnt++;
        totalLength += it.next().length();
    }
    LOGGER.info("cnt is:" + cnt);
    LOGGER.info("totalLength is:" + totalLength);

    // clearing the state
    listState.clear();
}

However every time I run the application, the value of cnt is always 1, and the value of totalLength is the length of the particular string that has been processed at that time. It looks like the state is not kept in the system. From this code is it clear what I am doing wrong here?

3 Answers 3

2

Process functions used key-partitioned state, meaning there is a separate list for every key. My guess is that there is no key with multiple events in a 10 second period.

Sign up to request clarification or add additional context in comments.

Comments

0

Your ProcessFunctionClass needs to extend the Flink ProcessFunction.

1 Comment

It does, I forgot to put it here. Edited.
-1

The ProcessFunction is not meant to hold the state. You can use WindowProcessFunctions to hold the state of the elements present in the window.

6 Comments

ProcessFunction can store a state. It requires to key the stream.
@emilio do you have any working example where we can keep a global state in a process function, for eg. CoProcessFunction. Our goal being to access the current element when next element comes in.
Can you explain what you mean by global state? For my understanding flink does not maintain a "global" state. Have a look at nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/… And if you want to make it global you can key your entire stream by something like stream.keyBy(..."some static key").process(..you process fun)
@emilio Lets assume we have a stream of kafka, the dtos from which we need to store in a mapstate. Now this mapstate currently only persists one entry. Meanining, if next entry comes in with key "xyz" we cannot check if mapstate already has a key named "xyz" because previous entry is gone now. That is what I mean by global state, i.e., one mapstate that should keep all the dtos of the stream.
As far as I understand you can use two keyBy. In the first one you use a static key keyBy("1") thus every incoming record will land on the same operator instance. After this you can key your stream again. But you should be aware that this can cause a bottleneck in your job and introduce further de/serialisation.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.