KAFKA-14783 (KIP-875): New STOPPED state for connectors#13424
Conversation
|
cc @yashmayya; feel free to review and/or build off of this for KAFKA-14786, KAFKA-14368, and KAFKA-14784. |
|
@mimaison do you think you'll have time to take a look? Hoping to get this in before the 3.5.0 release but we can punt if there's not enough bandwidth for review. |
mimaison
left a comment
There was a problem hiding this comment.
Yeah let's try to get this into 3.5. I took a quick look and left a few minor comments.
| void restartConnectorAndTasks(RestartRequest request, Callback<ConnectorStateInfo> cb); | ||
|
|
||
| /** | ||
| * Stop the conector. This call will asynchronously suspend processing by the connector and all |
| switch (state) { | ||
| case STOPPED: | ||
| return; | ||
| if ((state == State.STOPPED || state == State.PAUSED) && state == newState) { |
There was a problem hiding this comment.
As newState is either PAUSED or STOPPED, would state == newState be enough here?
| return null; | ||
| } | ||
|
|
||
| // TODO: We may want to add a new ConfigBackingStore method for stopping a connector so that |
There was a problem hiding this comment.
Should we create a ticket for this TODO?
There was a problem hiding this comment.
I'm worried that if we create a ticket, somebody might jump on that and make the change now, even though it's not really necessary yet. If it's too bothersome as a TODO I could just remove it altogether?
There was a problem hiding this comment.
Usually I prefer TODO tags to be associated with a task. What's the drawback if someone was to make this change now?
There was a problem hiding this comment.
Currently, the KafkaConfigBackingStore uses a transactional producer only when exactly once support is enabled on the worker. Maybe Chris' concern was that currently we'll only be able to do the tasks config + target state write atomically when exactly once support is enabled on the worker? 🤔
Although that is already the case today for other operations that require multiple writes to the config topic like putTaskConfigs and removeConnectorConfig, so I'm not sure whether or not that was his concern.
There was a problem hiding this comment.
I guess this wasn't really a TODO so much as a MAYBE-DO. I've removed it since on second thought it's a DON'T-DO-FOR-NOW as there is little if any benefit from that change, except possibly blocking the herder thread for less time if we're having trouble writing to the config topic.
There's also the transactional producer logic that Yash mentioned, but since it's not guaranteed that we'll have access to one (if exactly-once source support is disabled), that's not worth considering right now.
| log.info("Stopped {} connector {}", this.getClass().getSimpleName(), connectorName); | ||
| connectorHandle.recordConnectorStop(); | ||
| if (Boolean.parseBoolean(commonConfigs.getOrDefault("connector.stop.inject.error", "false"))) { | ||
| throw new RuntimeException("Injecting errors during connector start"); |
There was a problem hiding this comment.
during connector start -> during connector stop
yashmayya
left a comment
There was a problem hiding this comment.
Thanks Chris! I haven't had a chance to go through all the tests yet, but I hope to do so in a subsequent review.
| */ | ||
| public Map<String, Map<String, String>> taskConfigs(String connectorName) { | ||
| ObjectMapper mapper = new ObjectMapper(); | ||
| String url = endpointForResource(String.format("connectors/%s/tasks-config", connectorName)); |
There was a problem hiding this comment.
I'm a little confused as to why we have a GET /connectors/{connector}/tasks as well as a GET /connectors/{connector}/tasks-config API? Looks like the only difference between them is that the former returns "raw" task configurations (i.e. before externalized secrets are replaced using config transformers) - in which case the names of the two APIs don't really help to distinguish between them much (same with the Herder methods - Herder::taskConfigs and Herder::tasksConfig).
There was a problem hiding this comment.
I thought there was a difference but checking now they seem pretty similar. I also wonder if GET /connectors/{connector}/tasks-config should avoid returning the raw configs to avoid leaking values returned by providers.
There was a problem hiding this comment.
I also wonder if GET /connectors/{connector}/tasks-config should avoid returning the raw configs to avoid leaking values returned by providers.
Yeah that's a good point as well - it's possible that with certain config providers, Connect operators won't be able to access the raw secrets directly but will have access to the worker's REST API (and the worker obviously will have access to the secrets from the provider) right?
Also, if we choose to make that change (I assume it'll require a KIP because it's changing the response of a public REST API), the two APIs would essentially become identical. Would it be possible to deprecate / remove one of the APIs?
|
Thanks all for the reviews! I believe I've addressed every comment with either a code change or a response. This should be ready for another pass. |
yashmayya
left a comment
There was a problem hiding this comment.
Thanks Chris, I just had a couple more minor comments, but none blocking. LGTM!
There was a problem hiding this comment.
This calls TaskStatus.Listener::onDeletion after the tasks are stopped by the worker which updates their status to DESTROYED whereas this doesn't seem to happen in the DistributedHerder implementation for stopConnector so the task statuses will be UNASSIGNED post shutdown. I guess this shouldn't really matter anyway because we're publishing an empty set of task configs?
There was a problem hiding this comment.
I guess this shouldn't really matter anyway because we're publishing an empty set of task configs?
Exactly--and thanks to the logic introduced in the fix for KAFKA-9472, once the rebalance caused by the empty set of task configs has completed, the DistributedHerder class will automatically wipe the deleted tasks from the status store.
Co-authored-by: Yash Mayya <yash.mayya@gmail.com>
Co-authored-by: Yash Mayya <yash.mayya@gmail.com>
…r::suspend, fix MonitorableSourceConnector exception message
…uspend catch block
…en in DistributedHerder::stopConnector
…edHerderTest Co-authored-by: Yash Mayya <yash.mayya@gmail.com>
Co-authored-by: Yash Mayya <yash.mayya@gmail.com>
|
@mimaison thanks for the reviews. Do you think you'll have time to take another look at this before the April 12th feature freeze? |
|
Thanks Mickael! |
… w/ conflicts resolved Reviewers: Mickael Maison <mickael.maison@gmail.com>, Yash Mayya <yash.mayya@gmail.com>, Greg Harris <gharris1727@gmail.com>
Jira, relevant KIP section
Adds the new
STOPPEDtarget state for connectors, which causes all tasks for the connector to be shut down and for its status in the REST API to be updated toSTOPPED.Committer Checklist (excluded from commit message)