Fix TaskManagerTest#8
Conversation
|
@guozhangwang , I have a minute, so I figured I could fix this. I fixed the failing tests, but there are still ignored tests that need to be re-enabled. |
| @Test | ||
| public void shouldUnassignChangelogPartitionsOnShutdown() { | ||
| throw new RuntimeException(); | ||
| // restoreConsumer.unsubscribe(); |
There was a problem hiding this comment.
Added this check to shouldCloseActiveTasksOnShutdown (although now, it's a changelogReader expectation)
|
|
||
| @Ignore | ||
| @Test | ||
| public void shouldUpdateTasksFromPartitionAssignment() { |
|
|
||
| @Ignore | ||
| @Test | ||
| public void shouldNotResumeConsumptionUntilAllStoresRestored() { |
There was a problem hiding this comment.
This is tested effectively by making consumer a strict mock.
vvcephei
left a comment
There was a problem hiding this comment.
Hey @guozhangwang ,
I managed to finish up the failing/ignored tests in TaskManagerTest this evening. There are several tests of logic that has been moved elsewhere in Streams, so I made a note of them on this PR.
| } | ||
|
|
||
| @Test | ||
| public void shouldNotUpdateSubscriptionFromAssignmentOfExistingTopic1() { |
There was a problem hiding this comment.
If I read it right, this test is unnecessary now that we just pass through the whole assignment and let the topology builder itself merge the new subscription with the old one. I.e., this test is the same as the prior one now.
|
|
||
| @Ignore | ||
| @Test | ||
| public void shouldResumeRestoredPartitions() { |
There was a problem hiding this comment.
Does this need to be a ChangelogReader test now?
There was a problem hiding this comment.
Yes, it should be, I can add one in ChangelogReaderTest.
|
|
||
| @Ignore | ||
| @Test | ||
| public void shouldReturnTrueWhenActiveAndStandbyTasksAreRunning() { |
There was a problem hiding this comment.
This is asserted all over the other tests now
|
|
||
| @Ignore | ||
| @Test | ||
| public void shouldReturnFalseWhenOnlyActiveTasksAreRunning() { |
There was a problem hiding this comment.
I honestly wasn't sure what this test was going for... maybe it meant when only standby tasks are running?
Even so, that hardly seems like an important behavior to verify. Just to be clear, behavior of the manager is to return true when all its tasks are in RUNNING state, regardless of the task type.
|
|
||
| @Ignore | ||
| @Test | ||
| public void shouldSeekToCheckpointedOffsetOnStandbyPartitionsWhenOffsetGreaterThanEqualTo0() { |
There was a problem hiding this comment.
should this be a ChangelogReader test now?
There was a problem hiding this comment.
Yes, this is already in the test class.
|
|
||
| @Ignore | ||
| @Test | ||
| public void shouldSeekToBeginningIfOffsetIsLessThan0() { |
There was a problem hiding this comment.
should this be a ChangelogReader test now?
There was a problem hiding this comment.
Yes, already in StoreChangelogReaderTest.
| assertThat(taskManager.punctuate(), equalTo(2)); | ||
| } | ||
|
|
||
| // TODO K9113: the following three tests needs to be fixed once thread calling restore is cleaned |
There was a problem hiding this comment.
This test still needs to be added elsewhere, too.
There was a problem hiding this comment.
Ack, this is covered in the ChangelogReaderTest now.
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks @vvcephei ! I will merge the PR now and try to finish up the rest of the tests.
At the moment I'm adding more coverage to StreamTaskTest after I've completed the suspend / commit / close logic branching on state(), if you can take a quick look and lmk what do you think on these three functions that would be great :)
| } | ||
|
|
||
| @Test | ||
| public void shouldNotUpdateSubscriptionFromAssignmentOfExistingTopic1() { |
|
|
||
| @Ignore | ||
| @Test | ||
| public void shouldResumeRestoredPartitions() { |
There was a problem hiding this comment.
Yes, it should be, I can add one in ChangelogReaderTest.
|
|
||
| @Ignore | ||
| @Test | ||
| public void shouldReturnFalseWhenOnlyActiveTasksAreRunning() { |
|
|
||
| @Ignore | ||
| @Test | ||
| public void shouldSeekToCheckpointedOffsetOnStandbyPartitionsWhenOffsetGreaterThanEqualTo0() { |
There was a problem hiding this comment.
Yes, this is already in the test class.
|
|
||
| @Ignore | ||
| @Test | ||
| public void shouldSeekToBeginningIfOffsetIsLessThan0() { |
There was a problem hiding this comment.
Yes, already in StoreChangelogReaderTest.
| assertThat(taskManager.punctuate(), equalTo(2)); | ||
| } | ||
|
|
||
| // TODO K9113: the following three tests needs to be fixed once thread calling restore is cleaned |
There was a problem hiding this comment.
Ack, this is covered in the ChangelogReaderTest now.
|
Thanks @guozhangwang ! I just took a look at those methods in |
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below: Extract embedded clients (producer and consumer) into RecordCollector from StreamTask. guozhangwang#2 guozhangwang#5 Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread. guozhangwang#3 guozhangwang#4 Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state. guozhangwang#6 guozhangwang#7 Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)). guozhangwang#8 guozhangwang#9 Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2). guozhangwang#10 Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>
Finishing up the TaskManagerTests.
Committer Checklist (excluded from commit message)