Skip to content

KAFKA-6049: Add session window support for cogroup#7782

Merged
mjsax merged 11 commits into
apache:trunkfrom
wcarlson5:KAFKA-6049_SessionWindowedCogroup
Dec 15, 2019
Merged

KAFKA-6049: Add session window support for cogroup#7782
mjsax merged 11 commits into
apache:trunkfrom
wcarlson5:KAFKA-6049_SessionWindowedCogroup

Conversation

@wcarlson5

Copy link
Copy Markdown
Contributor

adding SessionWindowedCogroupedKStream options to the cogroup operator

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax added the streams label Dec 6, 2019
@mjsax mjsax changed the title Init SessionWindowedCogroupedKStream KAFKA-6049: Add session window support for cogroup Dec 11, 2019
@mjsax mjsax marked this pull request as ready for review December 11, 2019 07:48
@wcarlson5 wcarlson5 marked this pull request as ready for review December 11, 2019 07:53
* windowed aggregations.
*
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @return an instance of {@link TimeWindowedKStream}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return type is SessionWindowedCogroupedKStream

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it right the first time...

* {@link KGroupedStream} records.
* <p>
* The specified {@link SessionWindows} defines how the windows are created.
* The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local {@link SessionStore}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like I missed one...

@Override
public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
final Merger<? super K, V> sessionMerger) {
return aggregate(initializer, sessionMerger, Materialized.with(keySerde, null));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the keySerde is set to null in the constructor, hence, I am wondering if we should pass in null, here, too? (similar question for TimeWindowedCogroupedKStreamImpl)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its probably less confusing

Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
Objects.requireNonNull(named, "named can't be null");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck -- I was double checking existing code and many (all) operator actually allow named to be null -- do you remember why? I am wondering if this is a issue with existing code or an issue with the new code?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are options to just not pass it in anyone who would put null should just not include it

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck -- I was double checking existing code and many (all) operator actually allow named to be null -- do you remember why? I am wondering if this is a issue with existing code or an issue with the new code?

Which operators are you referring to? I just took a quick look at KStreamImpl and they don't allow null Named. Several of the overloads don't take a Named and create a NamedInternal#empty

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of examples, but your comment indicates, that they should allow it. Just wanted to confirm. Will go over the code and add the corresponding checks in a follow up PR to fix it.

@mjsax

mjsax commented Dec 13, 2019

Copy link
Copy Markdown
Member

Java 8 / 2.11 failed:

kafka.api.DelegationTokenEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaAssign
org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore

Java 11 passed.

Retest this please.

@mjsax mjsax merged commit dd8af2b into apache:trunk Dec 15, 2019
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
@wcarlson5 wcarlson5 deleted the KAFKA-6049_SessionWindowedCogroup branch August 18, 2020 17:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants