KAFKA-14209 : Rewrite self joins to use single state store 2/3#12644
Conversation
There was a problem hiding this comment.
The current JoinNode might have other JoinNodes as siblings. We need to differentiate between the WindowedStreamProcessorNode nodes that belong the current node versus others.
|
I opened the follow-up ticket for improving runtime by doing a single-loop https://issues.apache.org/jira/browse/KAFKA-14251. |
b503a10 to
f66d80a
Compare
address comments
f66d80a to
59ab634
Compare
| .withValueJoiner(joiner) | ||
| .withNodeName(joinMergeName); | ||
| .withNodeName(joinMergeName) | ||
| .withSelfJoinProcessorParameters(selfJoinProcessorParams); |
There was a problem hiding this comment.
Not a strong opinion, since I know we are messing the logical planner with physical info quite badly already.. but I'm wondering if we could defer this in StreamStreamJoinNode#writeToTopology inside the isSelfJoin condition? We still have all the pieces we need: 1) left store name, 2) both join window specs. 3) joiner, 4) time tracker, from other params, so that in writeToTopology we can still generate the KStreamKStreamSelfJoin object if self-join is enabled.
My motivation is just to not spill more physical node info into logical planning phase.
There was a problem hiding this comment.
Hey @guozhangwang! You mean to move to StreamStreamJoinNode#writeToTopology the code about the creation of the self-join node
final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new KStreamKStreamSelfJoin<>(
thisWindowStore.name(),
internalWindows,
joiner,
sharedTimeTracker
);
I tried it but I don't think it makes the code clearer. All the components of the StreamStreamJoinNode are built in the KStreamJoinImpl and are only visible there because they are not public. So, I would have to special case the self-join part which fixes the problem of spilling physical information into the logical plan but does it only for a small special case. I am worried this will be confusing.
There was a problem hiding this comment.
Ack, thanks a lot for letting me know :)
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks @vpapavas , I do not have further comments on the non-testing part. Made a final pass on the testing part, and just one more question. Otherwise I think it's good to go.
|
These test failures are unrelated: |
…e#12644) Implements KIP-862: https://cwiki.apache.org/confluence/x/WSf1D Reviewers: Guozhang Wang <guozhang@apache.org>, Austin Heyne <aheyne>, John Roesler <vvcephei@apache.org>
…e#12644) Implements KIP-862: https://cwiki.apache.org/confluence/x/WSf1D Reviewers: Guozhang Wang <guozhang@apache.org>, Austin Heyne <aheyne>, John Roesler <vvcephei@apache.org>
…e#12644) (#35) Implements KIP-862: https://cwiki.apache.org/confluence/x/WSf1D Reviewers: Guozhang Wang <guozhang@apache.org>, Austin Heyne <aheyne>, John Roesler <vvcephei@apache.org> Co-authored-by: Vicky Papavasileiou <vpapavas@users.noreply.github.com>
KIP can be found here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join
It only applies to Stream-Stream joins and not n-way self-joins.
This is an inner-join topology (without the optimization)
and this is the optimized self-join topology
Testing: Unit tests (Integration and upgrade test in follow up PR)
Committer Checklist (excluded from commit message)