KAFKA-6049: Add session window support for cogroup#7782
Conversation
| * windowed aggregations. | ||
| * | ||
| * @param sessionWindows the specification of the aggregation {@link SessionWindows} | ||
| * @return an instance of {@link TimeWindowedKStream} |
There was a problem hiding this comment.
return type is SessionWindowedCogroupedKStream
There was a problem hiding this comment.
I had it right the first time...
| * {@link KGroupedStream} records. | ||
| * <p> | ||
| * The specified {@link SessionWindows} defines how the windows are created. | ||
| * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating |
There was a problem hiding this comment.
looks like I missed one...
| @Override | ||
| public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer, | ||
| final Merger<? super K, V> sessionMerger) { | ||
| return aggregate(initializer, sessionMerger, Materialized.with(keySerde, null)); |
There was a problem hiding this comment.
the keySerde is set to null in the constructor, hence, I am wondering if we should pass in null, here, too? (similar question for TimeWindowedCogroupedKStreamImpl)
There was a problem hiding this comment.
its probably less confusing
| Objects.requireNonNull(initializer, "initializer can't be null"); | ||
| Objects.requireNonNull(sessionMerger, "sessionMerger can't be null"); | ||
| Objects.requireNonNull(materialized, "materialized can't be null"); | ||
| Objects.requireNonNull(named, "named can't be null"); |
There was a problem hiding this comment.
@bbejeck -- I was double checking existing code and many (all) operator actually allow named to be null -- do you remember why? I am wondering if this is a issue with existing code or an issue with the new code?
There was a problem hiding this comment.
Since there are options to just not pass it in anyone who would put null should just not include it
There was a problem hiding this comment.
@bbejeck -- I was double checking existing code and many (all) operator actually allow named to be null -- do you remember why? I am wondering if this is a issue with existing code or an issue with the new code?
Which operators are you referring to? I just took a quick look at KStreamImpl and they don't allow null Named. Several of the overloads don't take a Named and create a NamedInternal#empty
There was a problem hiding this comment.
There are a couple of examples, but your comment indicates, that they should allow it. Just wanted to confirm. Will go over the code and add the corresponding checks in a follow up PR to fix it.
|
Java 8 / 2.11 failed: Java 11 passed. Retest this please. |
adding SessionWindowedCogroupedKStream options to the cogroup operator
Committer Checklist (excluded from commit message)