KAFKA-9113: StreamThread unit test#10
Conversation
…to K9113-stream-thread-test
|
|
||
| @Override | ||
| public void commitTransaction() throws ProducerFencedException { | ||
| if (producerFencedOnCommitTxn) { |
There was a problem hiding this comment.
We do not need to fence on close since we do not close producer upon suspending anymore: with onPartitionsLost KAFKA-7285 would not happen.
| logPrefix, subscriptionUpdates); | ||
| setRegexMatchedTopicsToSourceNodes(); | ||
| setRegexMatchedTopicToStateStore(); | ||
| synchronized void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) { |
There was a problem hiding this comment.
Adding this functionality back as discovered a bug from integration tests.
| /** | ||
| * Initialize the record collector | ||
| */ | ||
| void initialize(); |
There was a problem hiding this comment.
This interface is no longer needed.
|
|
||
| // TODO K9113: this should be moved to task when it transits to running from created / restoring | ||
| // then even if there's a long time between the initialization and the first txn that is also fine. | ||
| initialize(); |
| @Override | ||
| public Set<TopicPartition> inputPartitions() { | ||
| return Collections.emptySet(); | ||
| return partitions; |
There was a problem hiding this comment.
Found we still need the input partitions for IQ purposes. cc @vvcephei .
| private volatile State state = State.CREATED; | ||
| private volatile ThreadMetadata threadMetadata; | ||
| private StreamThread.StateListener stateListener; | ||
| private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords; |
There was a problem hiding this comment.
This is not needed any more.
| } | ||
| } | ||
|
|
||
| private static void processVersionOneAssignment(final String logPrefix, |
There was a problem hiding this comment.
These two functions are used in upgrade tests only and now are replaced.
|
|
||
| void handleRebalanceStart() { | ||
| void handleRebalanceStart(final Set<String> subscribedTopics) { | ||
| builder.addSubscribedTopicsFromMetadata(subscribedTopics, logPrefix); |
There was a problem hiding this comment.
This is related to the bug I found: upon assign we still need to update subscription metadata.
| } | ||
|
|
||
| @Test | ||
| public void standbyShouldNotPerformRestoreAtStartup() throws Exception { |
There was a problem hiding this comment.
This test is merged into the one below.
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below: Extract embedded clients (producer and consumer) into RecordCollector from StreamTask. guozhangwang#2 guozhangwang#5 Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread. guozhangwang#3 guozhangwang#4 Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state. guozhangwang#6 guozhangwang#7 Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)). guozhangwang#8 guozhangwang#9 Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2). guozhangwang#10 Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>
Committer Checklist (excluded from commit message)