KAFKA-9113: Fix system / unit tests#11
Conversation
| this.stateDirectory = stateDirectory; | ||
| } | ||
|
|
||
| @Override |
| // to update offset limit for standby tasks; | ||
| private Consumer<byte[], byte[]> mainConsumer; | ||
|
|
||
| // the flag indicating limit offsets could be updated --- this is only needed for standby tasks that have limit |
There was a problem hiding this comment.
This is to clear one TODO to update limit offsets.
| @Override | ||
| public void suspend() { | ||
| if (state() == State.CLOSING || state() == State.SUSPENDED) { | ||
| if (state() == State.CREATED || state() == State.CLOSING || state() == State.SUSPENDED) { |
There was a problem hiding this comment.
This is found that when we are in created state (see below) we should not throw illegal-state exception.
| }, | ||
| RESTORING(2, 3, 4) { // 1 | ||
| @Override | ||
| public boolean hasBeenRunning() { |
There was a problem hiding this comment.
As suggested by @mjsax we can now remove this func.
| // Assert that both active and standby are able to query for a key | ||
| assertThat(store1.get(key), is(notNullValue())); | ||
| assertThat(store2.get(key), is(notNullValue())); | ||
| TestUtils.waitForCondition(() -> { |
There was a problem hiding this comment.
This is to fix a flaky test I found while working on the PR: even on trunk it is flaky too. The reason is that standby tasks may not fully restore even if it has transited to running.
| @parametrize(broker_version=str(LATEST_1_1)) | ||
| @parametrize(broker_version=str(LATEST_1_0)) | ||
| @parametrize(broker_version=str(LATEST_0_11_0)) | ||
| @parametrize(broker_version=str(LATEST_0_10_2)) |
There was a problem hiding this comment.
@ableegoldman this is to fix the broken system tests from the disable topic creation PR.
| logPrefix | ||
| ); | ||
|
|
||
| // initialize the created tasks |
There was a problem hiding this comment.
This is a found regression: initializeIfNeeded may fail to lock state dir if there are multiple threads, in which case we should just skip the initialization process and retry in the next loop.
|
I'm merging this back to |
|
|
||
| boolean allRunning = true; | ||
| if (!restoringTasks.isEmpty()) { | ||
| if (allRunning && !restoringTasks.isEmpty()) { |
There was a problem hiding this comment.
Not sure this conditional makes sense... It says "if all the tasks are running, and there are restoring tasks", which is self-contradictory. Maybe we can have two boolean flags, all initialized and all running?
There was a problem hiding this comment.
The rationale is that some tasks may still be in CREATED state and some maybe in RESTORING, in either case we should set allRunning to false. If some are still in CREATED state then allRunning would be false here already so the first condition is just to save the second check.
So I think correctness-wise that is fine but maybe a bit confusing, I can make this logic a bit cleaner now.
| taskManager.checkForCompletedRestoration(); | ||
|
|
||
| assertThat(task00.state(), is(Task.State.RUNNING)); |
There was a problem hiding this comment.
This seems out of the scope of this test, which is just that the new task gets created. It's probably redundant with another test now.
Committer Checklist (excluded from commit message)