KAFKA-9184: Redundant task creation and periodic rebalances after zombie Connect worker rejoins the group#7771
Conversation
There was a problem hiding this comment.
What is the significance of this check?
If a worker has one failed task and one running task, will the snapshot be reset?
Will the running task be stopped?
There was a problem hiding this comment.
This is a standard check we have when we check the assignmentSnapshot. I prefer to err on the safe side and use the second condition too.
The snapshot refers to your complete assignment (sets of connectors, tasks, revoked connectors and revoked tasks). The failure here corresponds to whether the assignment was successful or not as a whole. Currently the options are NO_ERROR and CONFIG_MISMATCH, in which case as the comments explain in the code the worker needs to read to the end of the config log and rejoin. A failed assignment does not have assigned tasks (the sets are empty). Even if there's an issue with assignment overwrites elsewhere, what we solve here remains unaffected.
There was a problem hiding this comment.
Thanks for explaining the meaning of a failed snapshot, I wasn't clear on that before. You explanation sounds reasonable.
rhauch
left a comment
There was a problem hiding this comment.
Thanks, @kkonstantine. One question below, and is there any way to easily test this from within IncrementalCooperativeAssignorTest or RebalanceSourceConnectorsIntegrationTest?
We'd probably have to enable EmbeddedConnectCluster to shut down the Kafka cluster for a period of time, perhaps by stopping the Kafka cluster and then bring it back up using the same ports (which might actually be problematic if other tests are run in parallel and their Kafka clusters happen to reuse the same ports). Not sure if this is really feasible. Thoughts?
There was a problem hiding this comment.
Will all of these methods to clear state within the assignment snapshot appear to happen atomically w/r/t when the assignment snapshot is read elsewhere in this class?
There was a problem hiding this comment.
Good point. We don't lock access to this collection. Setting to null to avoid any concurrent access issues (from the threads that export metrics potentially).
…ker coordinator is up from WorkerGroupMember
…nd stop tasks if coordinator is unreachable
fd95e90 to
01caf29
Compare
|
Thanks for the reviews @rhauch @gharris1727 ! |
| connect.kafka().startOnlyKafkaOnSamePorts(); | ||
|
|
||
| // Allow for the workers to discover that the coordinator is unavailable | ||
| Thread.sleep(TimeUnit.SECONDS.toMillis(10)); |
There was a problem hiding this comment.
Shouldn't the workers discover that the coordinator is unavailable while it is down?
I'm imagining this test going like this:
- steady-state workers are running
- brokers stop
- workers discover the coordinator is unavailable
- workers stop their tasks
- brokers start
- workers discover the next coordinator
- workers start their tasks
- workers are running unaffected
There was a problem hiding this comment.
That was a bit misplaced, because I actually need 3 explicit delays (due to current lack of appropriate handles from the kafka and connect embedded clusters).
- Bring kafka down, allow workers to discover it's down (heartbeat * 2 + 4 sec)
- Allow for Kafka to come back up
- Allow for worker cluster to stabilize after the very last rebalance (delay = 5sec)
Added another commit.
|
Two of the 3 jobs passed with the latest commits, and the other is still running:
Waiting for the 3rd job to complete (almost done): (This, despite the fact that Jenkins previously showed the status of all of these jobs as failed, but apparently they were still running. Evidently others have seen similar behavior recently.) |
|
Now, all 3 jobs passed! |
|
Thanks @rhauch for keep track of the build progress. Looks green to me too 😄 If we'd like to merge the zombie fix further back, it's better to issue a separate PR. |
rhauch
left a comment
There was a problem hiding this comment.
LGTM. Thanks for all the hard work on this PR, @kkonstantine. And thanks to @gharris1727 and @ncliang for help with testing to identify this.
|
Run with Connect system tests all passed: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3614/ |
gharris1727
left a comment
There was a problem hiding this comment.
LGTM.
Thanks @kkonstantine !
…bie Connect worker rejoins the group (#7771) Check connectivity with broker coordinator in intervals and stop tasks if coordinator is unreachable by setting `assignmentSnapshot` to null and resetting rebalance delay when there are no lost tasks. And, because we're now sometimes setting `assignmentSnapshot` to null and reading it from other methods and thread, made this member volatile and used local references to ensure consistent reads. Adapted existing unit tests to verify additional debug calls, added more specific log messages to `DistributedHerder`, and added a new integration test that verifies the behavior when the brokers are stopped and restarted only after the workers lose their heartbeats with the broker coordinator. Author: Konstantine Karantasis <konstantine@confluent.io> Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
…nces after zombie Connect worker rejoins the group (apache#7771) Check connectivity with broker coordinator in intervals and stop tasks if coordinator is unreachable by setting `assignmentSnapshot` to null and resetting rebalance delay when there are no lost tasks. And, because we're now sometimes setting `assignmentSnapshot` to null and reading it from other methods and thread, made this member volatile and used local references to ensure consistent reads. Adapted existing unit tests to verify additional debug calls, added more specific log messages to `DistributedHerder`, and added a new integration test that verifies the behavior when the brokers are stopped and restarted only after the workers lose their heartbeats with the broker coordinator. Author: Konstantine Karantasis <konstantine@confluent.io> Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
…nces after zombie Connect worker rejoins the group (#7771) (#7783) Check connectivity with broker coordinator in intervals and stop tasks if coordinator is unreachable by setting `assignmentSnapshot` to null and resetting rebalance delay when there are no lost tasks. And, because we're now sometimes setting `assignmentSnapshot` to null and reading it from other methods and thread, made this member volatile and used local references to ensure consistent reads. Adapted existing unit tests to verify additional debug calls, added more specific log messages to `DistributedHerder`, and added a new integration test that verifies the behavior when the brokers are stopped and restarted only after the workers lose their heartbeats with the broker coordinator. Author: Konstantine Karantasis <konstantine@confluent.io> Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
Zombie workers, defined as workers that lose connectivity with the Kafka broker coordinator and get kicked out of the group but don't experience a jvm restart, have been keeping their tasks running. This side-effect is more disrupting with the new Incremental Cooperative rebalance protocol. When such workers return:
a) they join the group with existing assignment and this leads to redundant tasks running in the Connect cluster, and
b) they interfere with the computation of lost tasks, which before this fix would lead to the scheduled rebalance delay not being reset correctly back to 0. This results in periodic rebalances.
This fix focuses on resolving the above side-effects as follows:
Besides the test included in this PR, the improvements are being tested with a framework that deploys a Connect cluster on docker images and introduces network partitions between all or selected workers and the Kafka brokers.
Committer Checklist (excluded from commit message)