3

I want to use a ProcessWindowFunction in my Apache Flink project. But I am getting some error when using process function, see below code snippet

The error is:

The method process(ProcessWindowFunction,R,Tuple,TimeWindow>) in the type WindowedStream,Tuple,TimeWindow> is not applicable for the arguments (JDBCExample.MyProcessWindows)

My program:

DataStream<Tuple2<String, JSONObject>> inputStream;

inputStream = env.addSource(new JsonArraySource());

inputStream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process(new MyProcessWindows());

My ProcessWindowFunction:

private class MyProcessWindows 
  extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{

  public void process(
      String key, 
      Context context, 
      Iterable<Tuple2<String, JSONObject>> input, 
      Collector<Tuple2<String, String>> out) throws Exception 
  {
    ...
  }

}
1
  • can you double check the error message? It seem as if there is something missing in the signature of the process() method. Commented Mar 20, 2018 at 9:34

2 Answers 2

5

The problem are probably the generic types of the ProcessWindowFunction.

You are referencing the key by position (keyBy(0)). Therefore, the compiler cannot infer its type (String) and you need to change the ProcessWindowFunction to:

private class MyProcessWindows 
    extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, Tuple, Window>

By replacing String by Tuple you have now a generic placeholder for keys that you can cast to Tuple1<String> when you need to access the key in the processElement() method:

public void process(
    Tuple key, 
    Context context, 
    Iterable<Tuple2<String, JSONObject>> input, 
    Collector<Tuple2<String, String>> out) throws Exception {

  String sKey = (String)((Tuple1)key).f0;
  ...
}

You can avoid the cast and use the proper type if you define a KeySelector<IN, KEY> function to extract the key, because the return type KEY of the KeySelector is known to the compiler.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for the help @Fabian, I replaced String by Tuple but still the same error persists. Do I need to define the KeySelector function too. Is it mandatory?? Also simple window operation like apply is also throwing the same error whereas reduce is working properly.
3

What Fabian said :) Using Tuple should work, but does involve some ugly type casts in your ProcessWindowFunction. Using a KeySelector is easy and results in cleaner code. E.g.

.keyBy(new KeySelector<Tuple2<String,JsonObject>, String>() {

    @Override
    public String getKey(Tuple2<String, JsonObject> in) throws Exception {
        return in.f0;
    }
})

The above then lets you define a ProcessWindowFunction like:

public class MyProcessWindows extends ProcessWindowFunction<Tuple2<String, JsonObject>, Tuple2<String, String>, String, TimeWindow> {

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.