Skip to content

MINOR: Convert last streams join test to TTD#7777

Merged
guozhangwang merged 5 commits into
apache:trunkfrom
bbejeck:MINOR_convert_last_streams_join_test_to_TTD
Dec 5, 2019
Merged

MINOR: Convert last streams join test to TTD#7777
guozhangwang merged 5 commits into
apache:trunkfrom
bbejeck:MINOR_convert_last_streams_join_test_to_TTD

Conversation

@bbejeck

@bbejeck bbejeck commented Dec 4, 2019

Copy link
Copy Markdown
Member

Conversion of the last join integration test to use TTD

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@bbejeck bbejeck added streams tests Test fixes (including flaky tests) labels Dec 4, 2019
@After
public void cleanup() throws InterruptedException {
producer.close(Duration.ofMillis(0));
CLUSTER.deleteAllTopicsAndWait(120000);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still needed. We have a couple of tests (non-processing) that start a Kafka Streams application and need the topics to exist, hence we delete them here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Does this mean, we still start an embedded Kafka before each test? Is it possible to start an embedded Kafka only in the tests that need it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to do that initially, but the logic for using the embedded broker is scattered throughout the base class. There are only two single test methods from all the sub-classes requiring the embedded broker, and these tests don't perform joins. My preference is to merge this PR as is and do a follow-up PR https://issues.apache.org/jira/browse/KAFKA-9273 to refactor the single test methods requiring an embedded broker into a separate class.

WDYT?

@bbejeck

bbejeck commented Dec 4, 2019

Copy link
Copy Markdown
Member Author

Ping @guozhangwang @cadonna for reviews

@cadonna cadonna left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck Thanks for the PR

TestUtils.waitForCondition(() -> finalResultReached.get(), "Never received expected final result.");
final List<TestRecord<Long, String>> output = outputTopic.readRecordsToList();

assertEquals(output.get(output.size() - 1), updatedExpectedFinalResult);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: Use assertThat() here and in the other overload of this method.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

firstTimestamp + expectedFinalResult.timestamp());
checkResult(OUTPUT_TOPIC, updatedExpectedFinalResult, numRecordsExpected);

TestUtils.waitForCondition(() -> finalResultReached.get(), "Never received expected final result.");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: What are we waiting for here? Shouldn't the result be immediately available?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. This was in the original test and I left this in by oversight, removing this lead to a significant cleanup!

Comment on lines 198 to 199
final long firstTimestamp = System.currentTimeMillis();
long ts = firstTimestamp;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: Should we use MockTime here?
prop: Could you use a more meaningful name for ts?

The above is also valid for the overload.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Good call on the MockTime!

}


void runTestWithDriver(final List<List<TestRecord<Long, String>>> expectedResult) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: Would it make sense to also have an overload with just a flat list, i.e.,

void runTestWithDriver(final List<TestRecord<Long, String>> expectedResult)

Maybe it would simplify the code of some of the tests. Hopefully, you can share some of the code in the overloads.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, but as the List<List<..>> represents the expected results from a join without caching enabled (multiple results) I think that will require a refactoring of the entire test(s). This logic was pre-existing from the original test. Giving that this PR is to convert from using the embedded broker to the TTD to perform the joins, I'd prefer to do the refactoring in a separate PR.

@After
public void cleanup() throws InterruptedException {
producer.close(Duration.ofMillis(0));
CLUSTER.deleteAllTopicsAndWait(120000);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Does this mean, we still start an embedded Kafka before each test? Is it possible to start an embedded Kafka only in the tests that need it?

@cadonna

cadonna commented Dec 4, 2019

Copy link
Copy Markdown
Member

BTW: Very nice PR number and I like PRs that remove more lines than they add. :-))

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bbejeck , I do not have any major comments; after @cadonna 's comments are addressed I think we can merge it.

@After
public void cleanup() throws InterruptedException {
producer.close(Duration.ofMillis(0));
CLUSTER.deleteAllTopicsAndWait(120000);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

void runTest(final KeyValueTimestamp<Long, String> expectedFinalResult, final String storeName) throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
void runTestWithDriver(final TestRecord<Long, String> expectedFinalResult, final String storeName) throws InterruptedException {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not introduced in this PR but in the existing runTestWithDriver we never use the passed in storeName it seems, and also the local expectedFinalResult are not used. Is that intentional?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TableTableJoinIntegrationTest passes in the store name as a parameter and it's used on line 218.
The test uses the expectedFinalResult parameter on lines 205-210. Having said that, this test could use an overall refactoring (cf https://issues.apache.org/jira/browse/KAFKA-9273), but I'd prefer to do so in a follow-up PR.
WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sounds fine.

@bbejeck

bbejeck commented Dec 5, 2019

Copy link
Copy Markdown
Member Author

@guozhangwang @cadonna thanks for the reviews!

comments addressed.

@guozhangwang guozhangwang merged commit 04cd145 into apache:trunk Dec 5, 2019
bbejeck added a commit that referenced this pull request Dec 5, 2019
Reviewers: Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
@bbejeck

bbejeck commented Dec 5, 2019

Copy link
Copy Markdown
Member Author

cherry-picked to 2.4

@bbejeck bbejeck deleted the MINOR_convert_last_streams_join_test_to_TTD branch July 10, 2024 13:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants