MINOR: Convert last streams join test to TTD#7777
Conversation
| @After | ||
| public void cleanup() throws InterruptedException { | ||
| producer.close(Duration.ofMillis(0)); | ||
| CLUSTER.deleteAllTopicsAndWait(120000); |
There was a problem hiding this comment.
This is still needed. We have a couple of tests (non-processing) that start a Kafka Streams application and need the topics to exist, hence we delete them here.
There was a problem hiding this comment.
Q: Does this mean, we still start an embedded Kafka before each test? Is it possible to start an embedded Kafka only in the tests that need it?
There was a problem hiding this comment.
I attempted to do that initially, but the logic for using the embedded broker is scattered throughout the base class. There are only two single test methods from all the sub-classes requiring the embedded broker, and these tests don't perform joins. My preference is to merge this PR as is and do a follow-up PR https://issues.apache.org/jira/browse/KAFKA-9273 to refactor the single test methods requiring an embedded broker into a separate class.
WDYT?
|
Ping @guozhangwang @cadonna for reviews |
| TestUtils.waitForCondition(() -> finalResultReached.get(), "Never received expected final result."); | ||
| final List<TestRecord<Long, String>> output = outputTopic.readRecordsToList(); | ||
|
|
||
| assertEquals(output.get(output.size() - 1), updatedExpectedFinalResult); |
There was a problem hiding this comment.
prop: Use assertThat() here and in the other overload of this method.
| firstTimestamp + expectedFinalResult.timestamp()); | ||
| checkResult(OUTPUT_TOPIC, updatedExpectedFinalResult, numRecordsExpected); | ||
|
|
||
| TestUtils.waitForCondition(() -> finalResultReached.get(), "Never received expected final result."); |
There was a problem hiding this comment.
Q: What are we waiting for here? Shouldn't the result be immediately available?
There was a problem hiding this comment.
Good call. This was in the original test and I left this in by oversight, removing this lead to a significant cleanup!
| final long firstTimestamp = System.currentTimeMillis(); | ||
| long ts = firstTimestamp; |
There was a problem hiding this comment.
prop: Should we use MockTime here?
prop: Could you use a more meaningful name for ts?
The above is also valid for the overload.
There was a problem hiding this comment.
Ack. Good call on the MockTime!
| } | ||
|
|
||
|
|
||
| void runTestWithDriver(final List<List<TestRecord<Long, String>>> expectedResult) { |
There was a problem hiding this comment.
prop: Would it make sense to also have an overload with just a flat list, i.e.,
void runTestWithDriver(final List<TestRecord<Long, String>> expectedResult)
Maybe it would simplify the code of some of the tests. Hopefully, you can share some of the code in the overloads.
There was a problem hiding this comment.
Maybe, but as the List<List<..>> represents the expected results from a join without caching enabled (multiple results) I think that will require a refactoring of the entire test(s). This logic was pre-existing from the original test. Giving that this PR is to convert from using the embedded broker to the TTD to perform the joins, I'd prefer to do the refactoring in a separate PR.
| @After | ||
| public void cleanup() throws InterruptedException { | ||
| producer.close(Duration.ofMillis(0)); | ||
| CLUSTER.deleteAllTopicsAndWait(120000); |
There was a problem hiding this comment.
Q: Does this mean, we still start an embedded Kafka before each test? Is it possible to start an embedded Kafka only in the tests that need it?
|
BTW: Very nice PR number and I like PRs that remove more lines than they add. :-)) |
| @After | ||
| public void cleanup() throws InterruptedException { | ||
| producer.close(Duration.ofMillis(0)); | ||
| CLUSTER.deleteAllTopicsAndWait(120000); |
| void runTest(final KeyValueTimestamp<Long, String> expectedFinalResult, final String storeName) throws Exception { | ||
| IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); | ||
| streams = new KafkaStreams(builder.build(), STREAMS_CONFIG); | ||
| void runTestWithDriver(final TestRecord<Long, String> expectedFinalResult, final String storeName) throws InterruptedException { |
There was a problem hiding this comment.
This is not introduced in this PR but in the existing runTestWithDriver we never use the passed in storeName it seems, and also the local expectedFinalResult are not used. Is that intentional?
There was a problem hiding this comment.
The TableTableJoinIntegrationTest passes in the store name as a parameter and it's used on line 218.
The test uses the expectedFinalResult parameter on lines 205-210. Having said that, this test could use an overall refactoring (cf https://issues.apache.org/jira/browse/KAFKA-9273), but I'd prefer to do so in a follow-up PR.
WDYT?
|
@guozhangwang @cadonna thanks for the reviews! comments addressed. |
Reviewers: Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
|
cherry-picked to 2.4 |
Conversion of the last join integration test to use TTD
Committer Checklist (excluded from commit message)