KAFKA-6049: Add non-windowed Cogroup operator (KIP-150)#7538
Conversation
There was a problem hiding this comment.
it is necessary to to do an unchecked cast for the input value type. This is because cogrouped can have any type value for the group streams it intakes
There was a problem hiding this comment.
A new case will be added for windowed streams
mjsax
left a comment
There was a problem hiding this comment.
Did an initial pass. Mostly nits.
We need to have a test that trigger repartitioning though -- I think, atm repartitioning would not work correctly. This might require an integration test.
|
retest this please |
1 similar comment
|
retest this please |
mjsax
left a comment
There was a problem hiding this comment.
Sorry for the long wait... I'll stay in this one to hopefully merge by Wednesday.
Btw: Can you rebase your PR to trunk -- a PR was merged that changes how the gradle build works, and you should pick up this change.
There was a problem hiding this comment.
Where do you validate that this name is picked up? -- Also, we we need to actually pipe input via TDD? Seems a builder.build().describe() would be sufficient to verify the name without the need to process any data?
There was a problem hiding this comment.
With regard to naming: we should also check that the store in only queryable is a name is specified via Materialized.
There was a problem hiding this comment.
I was using this test to print the topology and it shows two sub topologies while it should be one (seems the reason is that you use the same StreamsBuilder as in setup() method.
Also, the naming of the operators seems to be incorrect. Also wondering if KGroupedStream#cogroup() needs on overload that takes a Named parameter? Maybe not, but the specified Named from aggregate() would need to be used for other processors, too. Atm there is this weird COGROUPKSTREAM-AGGREGATE-KSTREAM-SOURCE-0000000001test
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [topic])
--> none
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000001 (topics: [one])
--> COGROUPKSTREAM-AGGREGATE-KSTREAM-SOURCE-0000000001test
Processor: COGROUPKSTREAM-AGGREGATE-KSTREAM-SOURCE-0000000001test (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000002])
--> test
<-- KSTREAM-SOURCE-0000000001
Processor: test (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000002])
--> KTABLE-TOSTREAM-0000000005
<-- COGROUPKSTREAM-AGGREGATE-KSTREAM-SOURCE-0000000001test
Processor: KTABLE-TOSTREAM-0000000005 (stores: [])
--> KSTREAM-SINK-0000000006
<-- test
Sink: KSTREAM-SINK-0000000006 (topic: output)
<-- KTABLE-TOSTREAM-0000000005
There was a problem hiding this comment.
Alright, the I changed the tests so that they only use the correct number of builders. And I discovered that orElseGenerateWithPrefix exists. so that should fix these problems
There was a problem hiding this comment.
I still don't see that the test verifies the names? Again the question, why do we need to process data to verify if there right names are assigned?
There was a problem hiding this comment.
I added a test for this
mjsax
left a comment
There was a problem hiding this comment.
If you want, we can also split the PR into two, and add auto-repartitioning in a follow up PR. We also need tests for this case.
There was a problem hiding this comment.
I still don't see that the test verifies the names? Again the question, why do we need to process data to verify if there right names are assigned?
Improved JavaDocs Code reformatting Added some more tests Fixed naming and updated naming-test
| */ | ||
| KTable<K, VOut> aggregate(final Initializer<VOut> initializer, | ||
| final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized); | ||
| KTable<K, VOut> aggregate(final Initializer<VOut> initializer); |
There was a problem hiding this comment.
I reordered the method from "few parameter" to "more parameters" to make it easier to navigate within the file.
| * streams of this {@code CogroupedKStream}. | ||
| * If this is not the case, you would need to call {@link KStream#through(String)} before | ||
| * {@link KStream#groupByKey() grouping} the {@link KStream}, using a pre-created topic with the "correct" number of | ||
| * partitions. |
| * {@link KeyValue} pairs. | ||
| * It is an intermediate representation of one or more {@link KStream}s | ||
| * in order to apply one or more aggregation operations on the original {@link KStream} | ||
| * records. |
There was a problem hiding this comment.
Removed this, as it describes CogroupedKStream what is not appropritate here.
| * StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and {@link | ||
| * StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. | ||
| * To compute the aggregation the corresponding {@link Aggregator} as specified in | ||
| * {@link #cogroup(KGroupedStream, Aggregator) cogroup(...)} is used per input stream. |
| * To compute the aggregation the corresponding {@link Aggregator} as specified in | ||
| * {@link #cogroup(KGroupedStream, Aggregator) cogroup(...)} is used per input stream. | ||
| * The specified {@link Initializer} is applied once per key, directly before the first input record per key is | ||
| * processed to provide an initial intermediate aggregation result that is used to process the first record. |
| * The specified {@link Aggregator} is applied in the actual {@link CogroupedKStream#aggregate(Initializer) | ||
| * aggregation} step for each input record and computes a new aggregate using the current aggregate (or for the very | ||
| * first record per key using the initial intermediate aggregation result provided via the {@link Initializer} that | ||
| * is passed into {@link CogroupedKStream#aggregate(Initializer)}) and the record's value. |
| public class CogroupedKStreamImpl<K, VOut> extends AbstractStream<K, VOut> implements CogroupedKStream<K, VOut> { | ||
|
|
||
| static final String AGGREGATE_NAME = "COGROUPKSTREAM-AGGREGATE-"; | ||
| static final String MERGE_NAME = "COGROUPKSTREAM-MERGE-"; |
There was a problem hiding this comment.
New default name for the merge-node
| this.groupPatterns = new LinkedHashMap<>(); | ||
| this.aggregateBuilder = new CogroupedStreamAggregateBuilder<>(builder); | ||
| groupPatterns = new LinkedHashMap<>(); | ||
| aggregateBuilder = new CogroupedStreamAggregateBuilder<>(builder); |
| Objects.requireNonNull(materialized, "materialized can't be null"); | ||
| final NamedInternal named = NamedInternal.empty(); | ||
| return aggregate(initializer, named, materialized); | ||
| return aggregate(initializer, NamedInternal.empty(), materialized); |
There was a problem hiding this comment.
Unified the non-null check into a single place.
| sessionMerger); | ||
| kGroupedStream.getValue(), | ||
| initializer, | ||
| named.suffixWithOrElseGet( |
There was a problem hiding this comment.
No need to pass in a Named -- we can just pass in the actual name as String directly -- otherwise we call suffixWithOrElseGet twice for no reason
| } | ||
| final String mergeProcessorName = named.orElseGenerateWithPrefix(builder, CogroupedKStreamImpl.AGGREGATE_NAME); | ||
| final String mergeProcessorName = named.suffixWithOrElseGet( | ||
| "-cogroup-merge", |
There was a problem hiding this comment.
Changed this to generate a name <userName>-cogroup-merge to align to <userName>-cogroup-agg-<counter> instead of just <userName> for the merge node.
| final SessionWindows sessionWindows, | ||
| final Merger<? super K, VOut> sessionMerger) { | ||
|
|
||
| final String processorName = named.orElseGenerateWithPrefix(builder, CogroupedKStreamImpl.AGGREGATE_NAME); |
There was a problem hiding this comment.
Removed this and pass in the processorName as String parameter directly.
|
|
||
| @Test(expected = NullPointerException.class) | ||
| public void shouldNotHaveNullInitializerOnAggregate() { | ||
| cogroupedStream.aggregate(null); |
There was a problem hiding this comment.
Added couple of more permutations for NPE tests.
| final KTable<String, String> customers = groupedOne | ||
| .cogroup(STRING_AGGREGATOR) | ||
| .cogroup(groupedTwo, STRING_AGGREGATOR) | ||
| .aggregate(STRING_INITIALIZER, Named.as("test"), Materialized.as("store")); |
There was a problem hiding this comment.
Also set the store name in this test.
|
@wcarlson5 I pushed a commit to fix jenkins checkstyle error -- will merge this after Jenkins is green. We will need 4 follow up PRs:
The first three can be done in parallel IMHO. The last one only at the very end. |
Follow up to PR #7538 (KIP-150) Reviewer: Matthias J. Sax <matthias@confluent.io>
Follow up to PR #7538 (KIP-150) Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Follow up to PR #7538 (KIP-150) Reviewer: Matthias J. Sax <matthias@confluent.io>
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)