Skip to content

KAFKA-9113: StreamThread unit test#10

Merged
guozhangwang merged 8 commits into
k9113-basefrom
K9113-stream-thread-test
Jan 28, 2020
Merged

KAFKA-9113: StreamThread unit test#10
guozhangwang merged 8 commits into
k9113-basefrom
K9113-stream-thread-test

Conversation

@guozhangwang

Copy link
Copy Markdown
Owner
  1. Fixed StreamThread unit test.
  2. Also fixed all integration tests.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)


@Override
public void commitTransaction() throws ProducerFencedException {
if (producerFencedOnCommitTxn) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need to fence on close since we do not close producer upon suspending anymore: with onPartitionsLost KAFKA-7285 would not happen.

logPrefix, subscriptionUpdates);
setRegexMatchedTopicsToSourceNodes();
setRegexMatchedTopicToStateStore();
synchronized void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this functionality back as discovered a bug from integration tests.

/**
* Initialize the record collector
*/
void initialize();

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is no longer needed.


// TODO K9113: this should be moved to task when it transits to running from created / restoring
// then even if there's a long time between the initialization and the first txn that is also fine.
initialize();

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After further reading on the source code, the original concern of KAFKA-6639 is only for beginTxn, not initTxn, so always calling it right after producer construction is fine (txn.id expiration is 7 days by default). cc @abbccdda, @mjsax.

@Override
public Set<TopicPartition> inputPartitions() {
return Collections.emptySet();
return partitions;

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found we still need the input partitions for IQ purposes. cc @vvcephei .

private volatile State state = State.CREATED;
private volatile ThreadMetadata threadMetadata;
private StreamThread.StateListener stateListener;
private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed any more.

}
}

private static void processVersionOneAssignment(final String logPrefix,

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two functions are used in upgrade tests only and now are replaced.


void handleRebalanceStart() {
void handleRebalanceStart(final Set<String> subscribedTopics) {
builder.addSubscribedTopicsFromMetadata(subscribedTopics, logPrefix);

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to the bug I found: upon assign we still need to update subscription metadata.

}

@Test
public void standbyShouldNotPerformRestoreAtStartup() throws Exception {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is merged into the one below.

@guozhangwang guozhangwang merged commit 73e2088 into k9113-base Jan 28, 2020
guozhangwang added a commit to apache/kafka that referenced this pull request Feb 5, 2020
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below:

Extract embedded clients (producer and consumer) into RecordCollector from StreamTask.
guozhangwang#2
guozhangwang#5

Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread.
guozhangwang#3
guozhangwang#4

Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state.
guozhangwang#6
guozhangwang#7

Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)).
guozhangwang#8
guozhangwang#9

Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2).
guozhangwang#10

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>
@guozhangwang guozhangwang deleted the K9113-stream-thread-test branch April 24, 2020 23:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant