Skip to content

KAFKA-9184: Redundant task creation and periodic rebalances after zombie Connect worker rejoins the group#7771

Merged
rhauch merged 12 commits into
apache:trunkfrom
kkonstantine:kafka-9184
Dec 4, 2019
Merged

KAFKA-9184: Redundant task creation and periodic rebalances after zombie Connect worker rejoins the group#7771
rhauch merged 12 commits into
apache:trunkfrom
kkonstantine:kafka-9184

Conversation

@kkonstantine

Copy link
Copy Markdown
Contributor

Zombie workers, defined as workers that lose connectivity with the Kafka broker coordinator and get kicked out of the group but don't experience a jvm restart, have been keeping their tasks running. This side-effect is more disrupting with the new Incremental Cooperative rebalance protocol. When such workers return:
a) they join the group with existing assignment and this leads to redundant tasks running in the Connect cluster, and
b) they interfere with the computation of lost tasks, which before this fix would lead to the scheduled rebalance delay not being reset correctly back to 0. This results in periodic rebalances.

This fix focuses on resolving the above side-effects as follows:

  • Each worker now tracks its connectivity with the broker coordinator in an unblocking manner. This allows the worker to detect that the broker coordinator is unreachable. The timeout is set to be equal to the heartbeat interval. If during this time the connection remains inactive, the worker will proactively stop all its connectors and tasks and will keep attempting to connect to the coordinator.
  • The incremental cooperative assignor will keep the delay to a positive value as long as it can detect lost tasks. If the set of tasks that are computed as lost becomes empty, the delay will be set to zero and no additional rebalancing will be scheduled.

Besides the test included in this PR, the improvements are being tested with a framework that deploys a Connect cluster on docker images and introduces network partitions between all or selected workers and the Kafka brokers.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@gharris1727 gharris1727 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great log messages.
Two nits, and one logic question.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the significance of this check?
If a worker has one failed task and one running task, will the snapshot be reset?
Will the running task be stopped?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a standard check we have when we check the assignmentSnapshot. I prefer to err on the safe side and use the second condition too.

The snapshot refers to your complete assignment (sets of connectors, tasks, revoked connectors and revoked tasks). The failure here corresponds to whether the assignment was successful or not as a whole. Currently the options are NO_ERROR and CONFIG_MISMATCH, in which case as the comments explain in the code the worker needs to read to the end of the config log and rejoin. A failed assignment does not have assigned tasks (the sets are empty). Even if there's an issue with assignment overwrites elsewhere, what we solve here remains unaffected.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining the meaning of a failed snapshot, I wasn't clear on that before. You explanation sounds reasonable.

@rhauch rhauch left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @kkonstantine. One question below, and is there any way to easily test this from within IncrementalCooperativeAssignorTest or RebalanceSourceConnectorsIntegrationTest?

We'd probably have to enable EmbeddedConnectCluster to shut down the Kafka cluster for a period of time, perhaps by stopping the Kafka cluster and then bring it back up using the same ports (which might actually be problematic if other tests are run in parallel and their Kafka clusters happen to reuse the same ports). Not sure if this is really feasible. Thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will all of these methods to clear state within the assignment snapshot appear to happen atomically w/r/t when the assignment snapshot is read elsewhere in this class?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We don't lock access to this collection. Setting to null to avoid any concurrent access issues (from the threads that export metrics potentially).

@kkonstantine

Copy link
Copy Markdown
Contributor Author

Thanks for the reviews @rhauch @gharris1727 !
I believe I've addressed all the comments.
The latest build succeeded on JDK8/Scala 2.11 and JDK 11/Scala 2.12
On JDK 11/Scala 2.13 it failed on a single, unrelated test:
kafka.admin.ReassignPartitionsClusterTest shouldTriggerReassignmentWithZnodePrecedenceOnControllerStartup FAILED

connect.kafka().startOnlyKafkaOnSamePorts();

// Allow for the workers to discover that the coordinator is unavailable
Thread.sleep(TimeUnit.SECONDS.toMillis(10));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the workers discover that the coordinator is unavailable while it is down?
I'm imagining this test going like this:

  1. steady-state workers are running
  2. brokers stop
  3. workers discover the coordinator is unavailable
  4. workers stop their tasks
  5. brokers start
  6. workers discover the next coordinator
  7. workers start their tasks
  8. workers are running unaffected

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was a bit misplaced, because I actually need 3 explicit delays (due to current lack of appropriate handles from the kafka and connect embedded clusters).

  1. Bring kafka down, allow workers to discover it's down (heartbeat * 2 + 4 sec)
  2. Allow for Kafka to come back up
  3. Allow for worker cluster to stabilize after the very last rebalance (delay = 5sec)

Added another commit.

@rhauch

rhauch commented Dec 4, 2019

Copy link
Copy Markdown
Contributor

Two of the 3 jobs passed with the latest commits, and the other is still running:

Waiting for the 3rd job to complete (almost done):

(This, despite the fact that Jenkins previously showed the status of all of these jobs as failed, but apparently they were still running. Evidently others have seen similar behavior recently.)

@rhauch

rhauch commented Dec 4, 2019

Copy link
Copy Markdown
Contributor

Now, all 3 jobs passed!

@kkonstantine

Copy link
Copy Markdown
Contributor Author

Thanks @rhauch for keep track of the build progress. Looks green to me too 😄
Just a note that this fix applies to 2.3, 2.4 and trunk release branches.

If we'd like to merge the zombie fix further back, it's better to issue a separate PR.
Cheers!

@rhauch rhauch left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for all the hard work on this PR, @kkonstantine. And thanks to @gharris1727 and @ncliang for help with testing to identify this.

@rhauch

rhauch commented Dec 4, 2019

Copy link
Copy Markdown
Contributor

Run with Connect system tests all passed: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3614/

@gharris1727 gharris1727 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Thanks @kkonstantine !

@rhauch rhauch merged commit 0e57a39 into apache:trunk Dec 4, 2019
rhauch pushed a commit that referenced this pull request Dec 4, 2019
…bie Connect worker rejoins the group (#7771)

Check connectivity with broker coordinator in intervals and stop tasks if coordinator is unreachable by setting `assignmentSnapshot` to null and resetting rebalance delay when there are no lost tasks. And, because we're now sometimes setting `assignmentSnapshot` to null and reading it from other methods and thread, made this member volatile and used local references to ensure consistent reads.

Adapted existing unit tests to verify additional debug calls, added more specific log messages to `DistributedHerder`, and added a new integration test that verifies the behavior when the brokers are stopped and restarted only after the workers lose their heartbeats with the broker coordinator.

Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
kkonstantine added a commit to kkonstantine/kafka that referenced this pull request Dec 5, 2019
…nces after zombie Connect worker rejoins the group (apache#7771)

Check connectivity with broker coordinator in intervals and stop tasks if coordinator is unreachable by setting `assignmentSnapshot` to null and resetting rebalance delay when there are no lost tasks. And, because we're now sometimes setting `assignmentSnapshot` to null and reading it from other methods and thread, made this member volatile and used local references to ensure consistent reads.

Adapted existing unit tests to verify additional debug calls, added more specific log messages to `DistributedHerder`, and added a new integration test that verifies the behavior when the brokers are stopped and restarted only after the workers lose their heartbeats with the broker coordinator.

Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
rhauch pushed a commit that referenced this pull request Dec 5, 2019
…nces after zombie Connect worker rejoins the group (#7771) (#7783)

Check connectivity with broker coordinator in intervals and stop tasks if coordinator is unreachable by setting `assignmentSnapshot` to null and resetting rebalance delay when there are no lost tasks. And, because we're now sometimes setting `assignmentSnapshot` to null and reading it from other methods and thread, made this member volatile and used local references to ensure consistent reads.

Adapted existing unit tests to verify additional debug calls, added more specific log messages to `DistributedHerder`, and added a new integration test that verifies the behavior when the brokers are stopped and restarted only after the workers lose their heartbeats with the broker coordinator.

Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
@kkonstantine kkonstantine deleted the kafka-9184 branch December 5, 2019 02:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants