7

I was reading on the current several limitation of the joins in kafka stream such as Ktable KTable non-key join or KTable GlobalKTable ....

I discovered that Flink seems to support all of it. From what I read, A dynamic Table sound like a KTable.

I wonder if first of all, they are the same concept, and then somehow how does Flink achieve that, I could not find documentation about the underlying infrastructure. For instance i did not find the notion of broadcast join that happens with GlobalKtable. Is the underlying infrastructure achieving dynamic table distributed ??

2 Answers 2

6

Flink's dynamic table and Kafka's KTable are not the same.

In Flink, a dynamic table is a very generic and broad concept, namely a table that evolves over time. This includes arbitrary changes (INSERT, DELETE, UPDATE). A dynamic table does not need a primary key or unique attribute, but it might have one.

  • A KStream is a special type of dynamic table, namely a dynamic table that is only receiving INSERT changes, i.e., an ever-growing, append-only table.

  • A KTable is another type of dynamic table, i.e., a dynamic table that has a unique key and changes with INSERT, DELETE, and UPDATE changes on the key.

Flink supports the following types of joins on dynamic tables. Note that the references to Kafka's joins might not be 100% accurate (happy to fix errors!).

  • Time-windowed joins should correspond to KSQL's KStream-KStream joins
  • Temporal table joins are similar to KSQL's KStream-KTable joins. The temporal relation between both tables needs to be explicitly specified in the query to be able to run the same query with identical semantics on batch/offline data.
  • Regular joins are more generic than KSQL's KTable-KTable joins because they don't require the input tables to have unique keys. Moreover, Flink does not distinguish between primary- or foreign-key joins, but requires that joins are equi-joins, i.e., have at least one equality predicate. At this point, the streaming SQL planner does not support broadcast-forward joins (which I believe should roughly correspond to KTable-GlobalKTable joins).
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks for the details Fabian! That is super helpful!
Thanks Matthias! Anything to correct from your point of view?
Sound accurate to me -- in an own answer, I said "both seem to be the same" as I focus on the "table" aspect, and skipped the part that Flink uses dynamic tables to model streams, too. It's good to get clarification how both concepts map to each other.
3

I am not 100% sure because I don't know all the details of Flink's "dynamic table" concept, but it seems to me it's the same as a KTable in Kafka Streams.

However, there is a difference between a KTable and a GlobalKTable in Kafka Streams, and both are not the exact same thing. (1) A KTable is distributed/sharded while a GlobalKTable is replicated/broadcasted. (2) A KTable is event time synchronized while a GlobalKTable is not. For the same reason, a GlobalKTable is fully loaded/bootstrapped on startup while a KTable is updated based on the changelog records event timestamps when appropriate (in relationship to the event timestamps of other input streams). Furthermore, during processing updates to a KTable are event time synchronized while updates to a GlobalKTable are not (ie, they are applied immediately and thus can be considered non-deterministic).

Last note: Kafka Streams adds foreign-key KTable-KTable joins in upcoming 2.4 release. There is also a ticket to add KTable-GlobalKTabel joins but this feature was not requested very often yet, and thus not added yet: https://issues.apache.org/jira/browse/KAFKA-4628

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.