Skip to content

KAFKA-9113: Fix system / unit tests#11

Merged
guozhangwang merged 6 commits into
k9113-basefrom
K9113-rebase
Feb 4, 2020
Merged

KAFKA-9113: Fix system / unit tests#11
guozhangwang merged 6 commits into
k9113-basefrom
K9113-rebase

Conversation

@guozhangwang

@guozhangwang guozhangwang commented Feb 4, 2020

Copy link
Copy Markdown
Owner
  1. Address comments from previous PRs.
  2. Rebase from trunk (primarily KIP-562).
  3. Fix system tests.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang

Copy link
Copy Markdown
Owner Author

this.stateDirectory = stateDirectory;
}

@Override

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move common functions here @mjsax.

// to update offset limit for standby tasks;
private Consumer<byte[], byte[]> mainConsumer;

// the flag indicating limit offsets could be updated --- this is only needed for standby tasks that have limit

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to clear one TODO to update limit offsets.

@Override
public void suspend() {
if (state() == State.CLOSING || state() == State.SUSPENDED) {
if (state() == State.CREATED || state() == State.CLOSING || state() == State.SUSPENDED) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is found that when we are in created state (see below) we should not throw illegal-state exception.

},
RESTORING(2, 3, 4) { // 1
@Override
public boolean hasBeenRunning() {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As suggested by @mjsax we can now remove this func.

// Assert that both active and standby are able to query for a key
assertThat(store1.get(key), is(notNullValue()));
assertThat(store2.get(key), is(notNullValue()));
TestUtils.waitForCondition(() -> {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to fix a flaky test I found while working on the PR: even on trunk it is flaky too. The reason is that standby tasks may not fully restore even if it has transited to running.

@parametrize(broker_version=str(LATEST_1_1))
@parametrize(broker_version=str(LATEST_1_0))
@parametrize(broker_version=str(LATEST_0_11_0))
@parametrize(broker_version=str(LATEST_0_10_2))

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman this is to fix the broken system tests from the disable topic creation PR.

logPrefix
);

// initialize the created tasks

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a found regression: initializeIfNeeded may fail to lock state dir if there are multiple threads, in which case we should just skip the initialization process and retry in the next loop.

@guozhangwang

Copy link
Copy Markdown
Owner Author

I'm merging this back to base now in order to trigger another unit / system directly, please feel free to continue reviewing and I will address comments in further PRs.

@guozhangwang guozhangwang merged commit d894ac7 into k9113-base Feb 4, 2020

boolean allRunning = true;
if (!restoringTasks.isEmpty()) {
if (allRunning && !restoringTasks.isEmpty()) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this conditional makes sense... It says "if all the tasks are running, and there are restoring tasks", which is self-contradictory. Maybe we can have two boolean flags, all initialized and all running?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale is that some tasks may still be in CREATED state and some maybe in RESTORING, in either case we should set allRunning to false. If some are still in CREATED state then allRunning would be false here already so the first condition is just to save the second check.

So I think correctness-wise that is fine but maybe a bit confusing, I can make this logic a bit cleaner now.

Comment on lines +252 to +254
taskManager.checkForCompletedRestoration();

assertThat(task00.state(), is(Task.State.RUNNING));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems out of the scope of this test, which is just that the new task gets created. It's probably redundant with another test now.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can do that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants