2

I am a newbie to Flink. Sometimes there are cases where I want to do aggregation on a DataStream without needed to do a keyBy first. Why doesn't Flink support aggregation (sum, min, max, etc.) on a DataStream?

Thank you, Ahmed.

1
  • You can comment or upvote or accept the answer if you find this is useful. Else the question may not be useful for the future viewers. Commented Mar 19, 2021 at 10:36

2 Answers 2

2

Flink supports aggregation for the non-keyed stream, but you have to apply windowAll operation first then you can apply the aggregation. windowAll function will reduce the parallelism value to 1, meaning all the data will flow through the single task slot. This is by design because when you have more than one task slot, you can do the aggregation only for the stream of data that are available in that slot, not for across slot.

If your use case doesn't fit to use windowAll with parallelism one (ie-when you have more number of records from source), then you can try to apply the keyBy function then aggregation, this will get the aggregated result for the set of keys then again windowAll and finally aggregate function. This way you are doing aggregation by key in a different task slot then finally aggregation on the reduced data in a single task slot.

Following is an example for windowAll without keyBy operation,

environment.fromCollection(list)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.max(1)

Following is an example for windowAll after keyBy operation,

environment.fromCollection(list)
.keyBy(1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.maxBy(1)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.max(1)

Reference for the documentation - here

Sign up to request clarification or add additional context in comments.

1 Comment

I think the quiestion poster may want to know if there is anyway to preaggregate the events before key by and avoid shuflling and improve performance which keyBy will trigger. This scenario is quite common and is not well supported in Flink and there is no out of box solution though you can write your own solution based on some low level API
1

With FLIP-134 the Flink community has decided to deprecate all of these relational methods from the DataStream API:

  • DataStream#project
  • Windowed/KeyedStream#sum,min,max,minBy,maxBy
  • DataStream#keyBy where the key specified with field name or index (including ConnectedStreams#keyBy)

The rationale behind this decision is that Table/SQL is a more complete and more performant relational API, and it already supports both batch and streaming. With these APIs you can easily perform global aggregations, without having to first do a keyBy or GROUP BY.

An example:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

SingleOutputStreamOperator<Integer> numbers = env.fromElements(0, 1, 1, 0, 3, 2);

Table data = tableEnv.fromDataStream(numbers, $("n"));

Table results = data.select($("n").max());

tableEnv
        .toRetractStream(results, Row.class)
        .print();

env.execute();

2 Comments

How does this internally work interms of task slot utilisation? Only 1 throughout or more than 1 then finally reduce to 1?
I don't believe the optimizer is smart enough to do a parallel pre-aggregation first. You can examine the execution plan and check, but if you want the optimized version I suspect you'll have to do it yourself.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.