Skip to content

KAFKA-6049: Add non-windowed Cogroup operator (KIP-150)#7538

Merged
mjsax merged 25 commits into
apache:trunkfrom
wcarlson5:KAFKA-6049_key_value_cogroup
Dec 1, 2019
Merged

KAFKA-6049: Add non-windowed Cogroup operator (KIP-150)#7538
mjsax merged 25 commits into
apache:trunkfrom
wcarlson5:KAFKA-6049_key_value_cogroup

Conversation

@wcarlson5

Copy link
Copy Markdown
Contributor

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is necessary to to do an unchecked cast for the input value type. This is because cogrouped can have any type value for the group streams it intakes

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new case will be added for windowed streams

@wcarlson5 wcarlson5 marked this pull request as ready for review October 18, 2019 16:43
@mjsax mjsax changed the title Kafka 6049 key value cogroup KAFKA-6049: Add non-windowed CoGroup operator (KIP-150) Oct 22, 2019
@mjsax mjsax added the streams label Oct 22, 2019

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did an initial pass. Mostly nits.

We need to have a test that trigger repartitioning though -- I think, atm repartitioning would not work correctly. This might require an integration test.

Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated
@wcarlson5

Copy link
Copy Markdown
Contributor Author

retest this please

1 similar comment
@wcarlson5

Copy link
Copy Markdown
Contributor Author

retest this please

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long wait... I'll stay in this one to hopefully merge by Wednesday.

Btw: Can you rebase your PR to trunk -- a PR was merged that changes how the gradle build works, and you should pick up this change.

Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you validate that this name is picked up? -- Also, we we need to actually pipe input via TDD? Seems a builder.build().describe() would be sufficient to verify the name without the need to process any data?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With regard to naming: we should also check that the store in only queryable is a name is specified via Materialized.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was using this test to print the topology and it shows two sub topologies while it should be one (seems the reason is that you use the same StreamsBuilder as in setup() method.

Also, the naming of the operators seems to be incorrect. Also wondering if KGroupedStream#cogroup() needs on overload that takes a Named parameter? Maybe not, but the specified Named from aggregate() would need to be used for other processors, too. Atm there is this weird COGROUPKSTREAM-AGGREGATE-KSTREAM-SOURCE-0000000001test

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])
      --> none

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000001 (topics: [one])
      --> COGROUPKSTREAM-AGGREGATE-KSTREAM-SOURCE-0000000001test
    Processor: COGROUPKSTREAM-AGGREGATE-KSTREAM-SOURCE-0000000001test (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000002])
      --> test
      <-- KSTREAM-SOURCE-0000000001
    Processor: test (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000002])
      --> KTABLE-TOSTREAM-0000000005
      <-- COGROUPKSTREAM-AGGREGATE-KSTREAM-SOURCE-0000000001test
    Processor: KTABLE-TOSTREAM-0000000005 (stores: [])
      --> KSTREAM-SINK-0000000006
      <-- test
    Sink: KSTREAM-SINK-0000000006 (topic: output)
      <-- KTABLE-TOSTREAM-0000000005

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, the I changed the tests so that they only use the correct number of builders. And I discovered that orElseGenerateWithPrefix exists. so that should fix these problems

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't see that the test verifies the names? Again the question, why do we need to process data to verify if there right names are assigned?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test for this

@mjsax mjsax changed the title KAFKA-6049: Add non-windowed CoGroup operator (KIP-150) KAFKA-6049: Add non-windowed Cogroup operator (KIP-150) Nov 27, 2019

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want, we can also split the PR into two, and add auto-repartitioning in a follow up PR. We also need tests for this case.

Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java Outdated

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't see that the test verifies the names? Again the question, why do we need to process data to verify if there right names are assigned?

wcarlson5 and others added 4 commits November 27, 2019 14:40
Improved JavaDocs
Code reformatting
Added some more tests
Fixed naming and updated naming-test
*/
KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, VOut> aggregate(final Initializer<VOut> initializer);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reordered the method from "few parameter" to "more parameters" to make it easier to navigate within the file.

* streams of this {@code CogroupedKStream}.
* If this is not the case, you would need to call {@link KStream#through(String)} before
* {@link KStream#groupByKey() grouping} the {@link KStream}, using a pre-created topic with the "correct" number of
* partitions.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New paragraph.

* {@link KeyValue} pairs.
* It is an intermediate representation of one or more {@link KStream}s
* in order to apply one or more aggregation operations on the original {@link KStream}
* records.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this, as it describes CogroupedKStream what is not appropritate here.

* StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and {@link
* StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* To compute the aggregation the corresponding {@link Aggregator} as specified in
* {@link #cogroup(KGroupedStream, Aggregator) cogroup(...)} is used per input stream.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New sentence.

* To compute the aggregation the corresponding {@link Aggregator} as specified in
* {@link #cogroup(KGroupedStream, Aggregator) cogroup(...)} is used per input stream.
* The specified {@link Initializer} is applied once per key, directly before the first input record per key is
* processed to provide an initial intermediate aggregation result that is used to process the first record.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added per key (twice)

* The specified {@link Aggregator} is applied in the actual {@link CogroupedKStream#aggregate(Initializer)
* aggregation} step for each input record and computes a new aggregate using the current aggregate (or for the very
* first record per key using the initial intermediate aggregation result provided via the {@link Initializer} that
* is passed into {@link CogroupedKStream#aggregate(Initializer)}) and the record's value.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New paragraph

public class CogroupedKStreamImpl<K, VOut> extends AbstractStream<K, VOut> implements CogroupedKStream<K, VOut> {

static final String AGGREGATE_NAME = "COGROUPKSTREAM-AGGREGATE-";
static final String MERGE_NAME = "COGROUPKSTREAM-MERGE-";

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New default name for the merge-node

this.groupPatterns = new LinkedHashMap<>();
this.aggregateBuilder = new CogroupedStreamAggregateBuilder<>(builder);
groupPatterns = new LinkedHashMap<>();
aggregateBuilder = new CogroupedStreamAggregateBuilder<>(builder);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unnecessary this.

Objects.requireNonNull(materialized, "materialized can't be null");
final NamedInternal named = NamedInternal.empty();
return aggregate(initializer, named, materialized);
return aggregate(initializer, NamedInternal.empty(), materialized);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unified the non-null check into a single place.

sessionMerger);
kGroupedStream.getValue(),
initializer,
named.suffixWithOrElseGet(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to pass in a Named -- we can just pass in the actual name as String directly -- otherwise we call suffixWithOrElseGet twice for no reason

}
final String mergeProcessorName = named.orElseGenerateWithPrefix(builder, CogroupedKStreamImpl.AGGREGATE_NAME);
final String mergeProcessorName = named.suffixWithOrElseGet(
"-cogroup-merge",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to generate a name <userName>-cogroup-merge to align to <userName>-cogroup-agg-<counter> instead of just <userName> for the merge node.

final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {

final String processorName = named.orElseGenerateWithPrefix(builder, CogroupedKStreamImpl.AGGREGATE_NAME);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this and pass in the processorName as String parameter directly.


@Test(expected = NullPointerException.class)
public void shouldNotHaveNullInitializerOnAggregate() {
cogroupedStream.aggregate(null);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added couple of more permutations for NPE tests.

final KTable<String, String> customers = groupedOne
.cogroup(STRING_AGGREGATOR)
.cogroup(groupedTwo, STRING_AGGREGATOR)
.aggregate(STRING_INITIALIZER, Named.as("test"), Materialized.as("store"));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also set the store name in this test.

@mjsax

mjsax commented Dec 1, 2019

Copy link
Copy Markdown
Member

@wcarlson5 I pushed a commit to fix jenkins checkstyle error -- will merge this after Jenkins is green.

We will need 4 follow up PRs:

  • add auto-repartitioning (including integration test)
  • time-windowed-cogroup
  • session-windowed-cogroup
  • update Scala API

The first three can be done in parallel IMHO. The last one only at the very end.

@mjsax mjsax merged commit 0b8ea7e into apache:trunk Dec 1, 2019
@wcarlson5 wcarlson5 deleted the KAFKA-6049_key_value_cogroup branch December 2, 2019 21:11
mjsax pushed a commit that referenced this pull request Dec 12, 2019
Follow up to PR #7538 (KIP-150)

Reviewer: Matthias J. Sax <matthias@confluent.io>
mjsax pushed a commit that referenced this pull request Dec 13, 2019
Follow up to PR #7538 (KIP-150)

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
mjsax pushed a commit that referenced this pull request Dec 15, 2019
Follow up to PR #7538 (KIP-150)

Reviewer: Matthias J. Sax <matthias@confluent.io>
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants