KAFKA-6049: Add time window support for cogroup#7774
Conversation
|
@mjsax The second PR for cogroup SessionWindows to follow shortly |
|
retest this please |
| * <p> | ||
| * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f. | ||
| * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}). | ||
| * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating |
There was a problem hiding this comment.
Hmmm... I guess this is c&p, but I am wondering why we link to KeyValueStore, but not to WindowStore ?
There was a problem hiding this comment.
I have been saying windowed KeyValueStore but in hindsight that doesn't make sense. am changing to WindowStore . I will change all of them and resolve the comments
|
|
||
| @Before | ||
| public void setup() { | ||
| final KStream<String, String> stream = builder.stream(TOPIC, Consumed |
There was a problem hiding this comment.
Can we "duplicate" this test and use default serdes from StreamsConfig? We recently discovered some bugs in other parts of the code with this regard and it would be good to verify that the code work correct upfront.
There was a problem hiding this comment.
I did that and I ran into some problems. It might be what you were referring to. I fixed it for now by specifying the serdes always
| @Test | ||
| public void timeWindowAggregateTest() { | ||
| final KTable<Windowed<String>, String> customers = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).windowedBy(TimeWindows.of(ofMillis(500L))).aggregate(MockInitializer.STRING_INIT); | ||
| customers.toStream().process(processorSupplier); |
There was a problem hiding this comment.
We should use to() and use a TestOutputTopic instead of the processor
There was a problem hiding this comment.
done but had to specify the serdes
|
Checkstyle error: |
|
retest this please |
mjsax
left a comment
There was a problem hiding this comment.
Overall LGTM. We should close the test gaps for missing tests, avoid redundant tests, and cleanup the redundent code.
|
|
||
| @SuppressWarnings("deprecation") | ||
| // continuing to support Windows#maintainMs/segmentInterval in fallback mode | ||
| private StoreBuilder<TimestampedWindowStore<K, V>> materialize( |
There was a problem hiding this comment.
This method seems to be the exact same as TimeWindowedKStreamImpl#materialize() -- we should share the code.
There was a problem hiding this comment.
It almost is. But there are a few Type differences that make it so I can't use it directly and if I were to adapt TimeWindowedKStreamImpl#materialize() it would become less type safe
mjsax
left a comment
There was a problem hiding this comment.
LGTM. Will merge after Jenkins passed.
|
Java 11 / 2.13 timed out. Other two builds passed. Retest this please. |
|
Java 8 failed with flaky test Other tests passed. Merging this. |
adding TimeWindowedCogroupedKStream options to the cogroup operator
Committer Checklist (excluded from commit message)