Skip to content

KAFKA-14783 (KIP-875): New STOPPED state for connectors#13424

Merged
C0urante merged 15 commits into
apache:trunkfrom
C0urante:kafka-14783
Apr 11, 2023
Merged

KAFKA-14783 (KIP-875): New STOPPED state for connectors#13424
C0urante merged 15 commits into
apache:trunkfrom
C0urante:kafka-14783

Conversation

@C0urante

Copy link
Copy Markdown
Contributor

Jira, relevant KIP section

Adds the new STOPPED target state for connectors, which causes all tasks for the connector to be shut down and for its status in the REST API to be updated to STOPPED.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@C0urante C0urante changed the title KAFKA-14783: New STOPPED state for connectors KAFKA-14783 (KIP-875): New STOPPED state for connectors Mar 20, 2023
@C0urante

Copy link
Copy Markdown
Contributor Author

cc @yashmayya; feel free to review and/or build off of this for KAFKA-14786, KAFKA-14368, and KAFKA-14784.

@C0urante

Copy link
Copy Markdown
Contributor Author

@mimaison do you think you'll have time to take a look? Hoping to get this in before the 3.5.0 release but we can punt if there's not enough bandwidth for review.

@mimaison mimaison left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's try to get this into 3.5. I took a quick look and left a few minor comments.

void restartConnectorAndTasks(RestartRequest request, Callback<ConnectorStateInfo> cb);

/**
* Stop the conector. This call will asynchronously suspend processing by the connector and all

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connector

switch (state) {
case STOPPED:
return;
if ((state == State.STOPPED || state == State.PAUSED) && state == newState) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As newState is either PAUSED or STOPPED, would state == newState be enough here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good call 👍

return null;
}

// TODO: We may want to add a new ConfigBackingStore method for stopping a connector so that

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create a ticket for this TODO?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that if we create a ticket, somebody might jump on that and make the change now, even though it's not really necessary yet. If it's too bothersome as a TODO I could just remove it altogether?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually I prefer TODO tags to be associated with a task. What's the drawback if someone was to make this change now?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the KafkaConfigBackingStore uses a transactional producer only when exactly once support is enabled on the worker. Maybe Chris' concern was that currently we'll only be able to do the tasks config + target state write atomically when exactly once support is enabled on the worker? 🤔

Although that is already the case today for other operations that require multiple writes to the config topic like putTaskConfigs and removeConnectorConfig, so I'm not sure whether or not that was his concern.

@C0urante C0urante Apr 10, 2023

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this wasn't really a TODO so much as a MAYBE-DO. I've removed it since on second thought it's a DON'T-DO-FOR-NOW as there is little if any benefit from that change, except possibly blocking the herder thread for less time if we're having trouble writing to the config topic.

There's also the transactional producer logic that Yash mentioned, but since it's not guaranteed that we'll have access to one (if exactly-once source support is disabled), that's not worth considering right now.

log.info("Stopped {} connector {}", this.getClass().getSimpleName(), connectorName);
connectorHandle.recordConnectorStop();
if (Boolean.parseBoolean(commonConfigs.getOrDefault("connector.stop.inject.error", "false"))) {
throw new RuntimeException("Injecting errors during connector start");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

during connector start -> during connector stop

@yashmayya yashmayya left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Chris! I haven't had a chance to go through all the tests yet, but I hope to do so in a subsequent review.

Comment thread connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java Outdated
*/
public Map<String, Map<String, String>> taskConfigs(String connectorName) {
ObjectMapper mapper = new ObjectMapper();
String url = endpointForResource(String.format("connectors/%s/tasks-config", connectorName));

@yashmayya yashmayya Mar 22, 2023

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused as to why we have a GET /connectors/{connector}/tasks as well as a GET /connectors/{connector}/tasks-config API? Looks like the only difference between them is that the former returns "raw" task configurations (i.e. before externalized secrets are replaced using config transformers) - in which case the names of the two APIs don't really help to distinguish between them much (same with the Herder methods - Herder::taskConfigs and Herder::tasksConfig).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question... I know the tasks-config endpoint was introduced in KIP-661, but now that you mention it, it does seem a little redundant given the /tasks endpoint's content.

Maybe @mimaison could shed some light here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought there was a difference but checking now they seem pretty similar. I also wonder if GET /connectors/{connector}/tasks-config should avoid returning the raw configs to avoid leaking values returned by providers.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if GET /connectors/{connector}/tasks-config should avoid returning the raw configs to avoid leaking values returned by providers.

Yeah that's a good point as well - it's possible that with certain config providers, Connect operators won't be able to access the raw secrets directly but will have access to the worker's REST API (and the worker obviously will have access to the secrets from the provider) right?

Also, if we choose to make that change (I assume it'll require a KIP because it's changing the response of a public REST API), the two APIs would essentially become identical. Would it be possible to deprecate / remove one of the APIs?

Comment thread gradle/spotbugs-exclude.xml
@C0urante C0urante added connect kip Requires or implements a KIP labels Mar 22, 2023
@C0urante

Copy link
Copy Markdown
Contributor Author

Thanks all for the reviews! I believe I've addressed every comment with either a code change or a response. This should be ready for another pass.

@yashmayya yashmayya left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Chris, I just had a couple more minor comments, but none blocking. LGTM!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This calls TaskStatus.Listener::onDeletion after the tasks are stopped by the worker which updates their status to DESTROYED whereas this doesn't seem to happen in the DistributedHerder implementation for stopConnector so the task statuses will be UNASSIGNED post shutdown. I guess this shouldn't really matter anyway because we're publishing an empty set of task configs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this shouldn't really matter anyway because we're publishing an empty set of task configs?

Exactly--and thanks to the logic introduced in the fix for KAFKA-9472, once the rebalance caused by the empty set of task configs has completed, the DistributedHerder class will automatically wipe the deleted tasks from the status store.

@gharris1727 gharris1727 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@C0urante

Copy link
Copy Markdown
Contributor Author

@mimaison thanks for the reviews. Do you think you'll have time to take another look at this before the April 12th feature freeze?

@mimaison mimaison left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@C0urante C0urante merged commit e49a5a2 into apache:trunk Apr 11, 2023
@C0urante C0urante deleted the kafka-14783 branch April 11, 2023 13:37
@C0urante

Copy link
Copy Markdown
Contributor Author

Thanks Mickael!

GonzalooDelgado pushed a commit to GonzalooDelgado/kafka that referenced this pull request Dec 3, 2025
… w/ conflicts resolved

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Yash Mayya <yash.mayya@gmail.com>, Greg Harris <gharris1727@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connect kip Requires or implements a KIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants