Skip to content

KAFKA-6049: Add time window support for cogroup#7774

Merged
mjsax merged 7 commits into
apache:trunkfrom
wcarlson5:KAFKA-6049_TimeWindowedCogroup
Dec 12, 2019
Merged

KAFKA-6049: Add time window support for cogroup#7774
mjsax merged 7 commits into
apache:trunkfrom
wcarlson5:KAFKA-6049_TimeWindowedCogroup

Conversation

@wcarlson5

@wcarlson5 wcarlson5 commented Dec 3, 2019

Copy link
Copy Markdown
Contributor

adding TimeWindowedCogroupedKStream options to the cogroup operator

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@wcarlson5

Copy link
Copy Markdown
Contributor Author

@mjsax The second PR for cogroup SessionWindows to follow shortly

@wcarlson5

Copy link
Copy Markdown
Contributor Author

retest this please

@mjsax mjsax added the streams label Dec 6, 2019

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did an initial pass -- I would assume that many of the comments also apply to #7782 -- I won't repeat them on the other PR -- please update both -- some comment are even repeats from the first PR that was already merged).

* <p>
* The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
* {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
* The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... I guess this is c&p, but I am wondering why we link to KeyValueStore, but not to WindowStore ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been saying windowed KeyValueStore but in hindsight that doesn't make sense. am changing to WindowStore . I will change all of them and resolve the comments


@Before
public void setup() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we "duplicate" this test and use default serdes from StreamsConfig? We recently discovered some bugs in other parts of the code with this regard and it would be good to verify that the code work correct upfront.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that and I ran into some problems. It might be what you were referring to. I fixed it for now by specifying the serdes always

@Test
public void timeWindowAggregateTest() {
final KTable<Windowed<String>, String> customers = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).windowedBy(TimeWindows.of(ofMillis(500L))).aggregate(MockInitializer.STRING_INIT);
customers.toStream().process(processorSupplier);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use to() and use a TestOutputTopic instead of the processor

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done but had to specify the serdes

@mjsax

mjsax commented Dec 9, 2019

Copy link
Copy Markdown
Member

Checkstyle error:

Task :streams:checkstyleMain
16:26:42 [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java:22:8: Unused import - org.apache.kafka.streams.StreamsConfig. [UnusedImports]

@wcarlson5

Copy link
Copy Markdown
Contributor Author

retest this please

@mjsax mjsax marked this pull request as ready for review December 11, 2019 07:42
@mjsax mjsax changed the title Draft! Kafka 6049 Created TimeWindowedCogroupedKStream KAFKA-6049: Add time window support for cogroup Dec 11, 2019

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. We should close the test gaps for missing tests, avoid redundant tests, and cleanup the redundent code.


@SuppressWarnings("deprecation")
// continuing to support Windows#maintainMs/segmentInterval in fallback mode
private StoreBuilder<TimestampedWindowStore<K, V>> materialize(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems to be the exact same as TimeWindowedKStreamImpl#materialize() -- we should share the code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It almost is. But there are a few Type differences that make it so I can't use it directly and if I were to adapt TimeWindowedKStreamImpl#materialize() it would become less type safe

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Will merge after Jenkins passed.

@mjsax

mjsax commented Dec 12, 2019

Copy link
Copy Markdown
Member

Java 11 / 2.13 timed out. Other two builds passed.

Retest this please.

@mjsax

mjsax commented Dec 12, 2019

Copy link
Copy Markdown
Member

Java 8 failed with flaky test KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable (created ticket for tracking).

Other tests passed. Merging this.

@mjsax mjsax merged commit d1161bf into apache:trunk Dec 12, 2019
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
@wcarlson5 wcarlson5 deleted the KAFKA-6049_TimeWindowedCogroup branch August 18, 2020 17:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants