KAFKA-6886 Externalize secrets from Connect configs#5068
Conversation
f62199a to
aa3eb04
Compare
be52a09 to
15d2532
Compare
|
retest this please |
| * @param path the path where the data resides | ||
| * @param keys the keys whose values will be retrieved | ||
| */ | ||
| void unsubscribe(String path, Set<String> keys); |
There was a problem hiding this comment.
Only one subscriber is expected? If not, does this unsubscribe all subscriptions for the keys at the path?
There was a problem hiding this comment.
How about a default implementation?
There was a problem hiding this comment.
I'll add the callback to this method.
| * | ||
| * @param configProviders the set of {@link ConfigProvider} instances. | ||
| */ | ||
| public ConfigTransformer(Map<String, ConfigProvider> configProviders) { |
There was a problem hiding this comment.
How about providing a builder API to easily create these with individual named providers? Not sure how valuable it will be.
| ConfigProvider provider = configProviders.get(providerName); | ||
| Map<String, Set<String>> keysByPath = entry.getValue(); | ||
| if (provider != null && keysByPath != null) { | ||
| for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) { |
There was a problem hiding this comment.
Perhaps?
keysByPath.entrySet().forEach(entry-> {
| Set<String> keys = keysByPath.computeIfAbsent(var.path, k -> new HashSet<>()); | ||
| keys.add(var.variable); | ||
| } | ||
| } |
There was a problem hiding this comment.
Using Java 8 streams would be a bit more compact and IMO more readable:
configs.forEach((key, value)->{
getVars(key, value, DEFAULT_PATTERN).forEach(var-> {
keysByProvider.computeIfAbsent(var.providerName, k -> new HashMap<>())
.computeIfAbsent(var.path, k-> new HashSet<>())
.add(var.variable);
});
});
There was a problem hiding this comment.
Oh, I'm not a big fan of nested foreach: https://blog.jooq.org/2015/12/08/3-reasons-why-you-shouldnt-replace-your-for-loops-by-stream-foreach/
| keyValuesByPath.put(path, data); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Again, this could be simplified quite a bit with streams.
There was a problem hiding this comment.
lol, i expect this to be a ripe area for bikeshedding. you can definitely reduce some verbosity with streams, unclear that it's actually simplified. for the time being, i think we should be liberal in accepting either form of style and we can figure out where we want to land over time.
| if (keys.contains("testKey")) { | ||
| data.put("testKey", "testResult"); | ||
| } | ||
| if (keys.contains("testKeyWithTTL")) { |
There was a problem hiding this comment.
Might be good to use constants for these keys and values. That would be a bit more maintainable, as long as it doesn't make it unreadable.
| * @param keys the keys whose values will be retrieved | ||
| * @param callback the callback to invoke upon change | ||
| */ | ||
| void subscribe(String path, Set<String> keys, ConfigChangeCallback callback); |
There was a problem hiding this comment.
How about a default implementation?
| public interface SinkTaskContext { | ||
|
|
||
| /** | ||
| * Get the Task configuration. |
There was a problem hiding this comment.
Need more JavaDoc to define the behavior. Does this return null? Will the result ever differ if called multiple times? How does this relate to the configuration passed to the connector and/or task upon startup?
| */ | ||
| public interface SourceTaskContext { | ||
| /** | ||
| * Get the Task configuration. |
| getPluginDesc(reflections, HeaderConverter.class, loader), | ||
| getPluginDesc(reflections, Transformation.class, loader) | ||
| getPluginDesc(reflections, Transformation.class, loader), | ||
| getPluginDesc(reflections, ConfigProvider.class, loader) |
There was a problem hiding this comment.
We don't want to scan for this, but want to use the Service Loader mechanism. Do you plan to change this when the REST extension PR is merged?
There was a problem hiding this comment.
Yes, that will be for a future PR
|
|
||
| public static final String CONFIG_RELOAD_ACTION_CONFIG = "config.action.reload"; | ||
| private static final String CONFIG_RELOAD_ACTION_DOC = | ||
| "The action to take in order to reload changed configuration values (none or restart)."; |
There was a problem hiding this comment.
This should probably make more clear that this occurs when configuration values change in external providers, and should describe what none and restart mean. Users shouldn't have to go to the code or KIP to understand this configuration property.
| */ | ||
| package org.apache.kafka.connect.runtime; | ||
|
|
||
| public interface HerderRequestId { |
There was a problem hiding this comment.
This seems like an odd name for the response from a herder request: there's no "id" associated with this.
There was a problem hiding this comment.
I renamed to just HerderRequest
| + "/opt/connectors"; | ||
|
|
||
| public static final String CONFIG_PROVIDERS_CONFIG = "config.providers"; | ||
| protected static final String CONFIG_PROVIDERS_DOC = "List of configuration providers."; |
There was a problem hiding this comment.
Please be more specific. "List of configuration providers. This is a comma-separated list of the fully-qualified names of the ConfigProvider implementations, in the order they will be created, configured, and used."
| } | ||
|
|
||
| @Override | ||
| public String getConnectorConfigReloadAction(final String connName) { |
There was a problem hiding this comment.
Should we have an enum for reload action?
|
@rhauch , thanks for the review! I've incorporated most of your suggestions (except the use of foreach :)). For the |
rhauch
left a comment
There was a problem hiding this comment.
Ok, one more round of mostly questions and a few minor suggestions.
| } | ||
|
|
||
| public void subscribe(String path, Set<String> keys, ConfigChangeCallback callback) { | ||
| throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Isn't this the default implementation?
| } | ||
|
|
||
| public void unsubscribe(String path, Set<String> keys) { | ||
| throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Isn't this the default implementation?
| * </pre> | ||
| * resides at the path "/tmp/properties.txt", then when a configuration Map which has an entry with a key "someKey" and | ||
| * a value "${file:/tmp/properties.txt:fileKey}" is passed to the {@link #transform(Map)} method, then the transformed | ||
| * Map will have an entry with key "someKey" and a value "someValue". |
There was a problem hiding this comment.
Might be useful to mention that this only requires the get methods on ConfigProvider, and does not use subscribe.
|
|
||
| /** | ||
| * Returns the transformed data, with variables replaced with corresponding values from the | ||
| * ConfigProvider instances if found. |
There was a problem hiding this comment.
Would it be useful to mention that changes to this map will not affect the origin of the configuration or the information in the ConfigProvider(s)?
| import java.util.Set; | ||
|
|
||
| /** | ||
| * An implementation of {@link ConfigProvider} that represents a Properties file. |
There was a problem hiding this comment.
I think this should mention that all information stored in the file are stored in cleartext, so care must be used if any secrets are stored in the backing file.
There was a problem hiding this comment.
Let's take this to a follow up that clarifies the details of this. We should probably be very verbose about this being an example and you want to be careful about how you use it if you choose too. It's a valid choice in quite a few contexts, but some warnings are probably in order.
@rayokota Maybe file a follow up JIRA so we don't lose track of it? Should be a pretty minor change.
| "The action to take in order to reload changed configuration values (none or restart). " | ||
| + "Configuration values will typically only change if using external configuration providers. " | ||
| + "A value of 'none' indicates that the new values will not be reloaded. " | ||
| + "A value of 'reload' indicates that the new values will be reloaded by restarting the connector. " |
There was a problem hiding this comment.
I'm not convinced that a user will know what this means. How about making it much more specific and concrete:
The action that Connect should take on the connector when changes in external
configuration providers results in a change in the connector's configuration properties.
A value of `none` indicates that Connect will do nothing.
A value of 'reload' indicates that Connect should restart/reload the connector with the
updated configuration properties....
There was a problem hiding this comment.
Also, is this restart or reload? The enum is RESTART.
There was a problem hiding this comment.
Also, is this case sensitive?
| if (configTransformer != null) { | ||
| configs = configTransformer.transform(connector, configs); | ||
| } | ||
| return configs; |
There was a problem hiding this comment.
The JavaDoc for this method should probably say that it includes the current transformed connector configuration where all variables have been replaced with the current values from the external ConfigProviders, and that they may include secrets.
| if (configTransformer != null) { | ||
| configs = configTransformer.transform(task.connector(), configs); | ||
| } | ||
| return configs; |
There was a problem hiding this comment.
The JavaDoc for this method should probably say that it includes the current transformed connector configuration where all variables have been replaced with the current values from the external ConfigProviders, and that they may include secrets.
| + "indicates that a configuration value will expire in the future."; | ||
| "The action that Connect should take on the connector when changes in external " + | ||
| "configuration providers result in a change in the connector's configuration properties. " + | ||
| "A value of `none` indicates that Connect will do nothing. " + |
There was a problem hiding this comment.
The backtick was from my original comment. 😊 Should change that to single quote like on the next line.
rhauch
left a comment
There was a problem hiding this comment.
One more minor thing: some JavaDoc lines were accidentally removed.
| * Get the configuration for a connector. The configuration will have been transformed by | ||
| * {@link org.apache.kafka.common.config.ConfigTransformer} by having all variable | ||
| * references replaced with the current values from external instances of | ||
| * {@link org.apache.kafka.common.config.ConfigProvider}, and may include secrets. |
There was a problem hiding this comment.
Now missing @param and @return.
|
Thanks, @rhauch , appreciate your attention to detail :) |
| * @param data a Map of key-value pairs | ||
| */ | ||
| public ConfigData(Map<String, String> data) { | ||
| this(data, Long.MAX_VALUE); |
There was a problem hiding this comment.
Do we actually want this to be Long.MAX_VALUE? Seems error prone since the normal thing to do with the TTL is add it to the current time and that will cause overflow. Should we have some sentinel value for infinite TTL?
| ConfigData configData = provider.get(path, keys); | ||
| Map<String, String> data = configData.data(); | ||
| long ttl = configData.ttl(); | ||
| if (ttl >= 0 && ttl < Long.MAX_VALUE) { |
There was a problem hiding this comment.
I think we should have a constant in the ConfigData class for whatever value ends up being the sentinel, even if it ends up being Long.MAX_VALUE so that uses of the constant outside that class are clearer as to their meaning. Also, another alternative would be to track the TTL as a Long instead of a long, which is less user friendly but doesn't require a special sentinel value.
There was a problem hiding this comment.
Thanks, I'll change to Long
| StatusBackingStore statusBackingStore, | ||
| ConfigBackingStore configBackingStore) { | ||
| this.worker = worker; | ||
| this.worker.herder = this; |
There was a problem hiding this comment.
We can probably clean up or refactor later, but this doesn't seem great -- code like this indicates overly strong coupling between these two classes and makes ownership semantics unclear.
| * Get the configuration reload action. | ||
| * @param connName name of the connector | ||
| */ | ||
| ConfigReloadAction getConnectorConfigReloadAction(final String connName); |
There was a problem hiding this comment.
nit: Kafka generally avoids using get/set at the beginning of accessors.
| KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); | ||
| return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, | ||
| headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, time); | ||
| headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, configState, metrics, loader, time); |
There was a problem hiding this comment.
You might want to put some comments in the context classes about why we need the entire config. At first glance it appears possibly problematic because we're saving the entire snapshot state for the cluster, but really only care about a single config. It's possible for the state to become outdated (e.g. config change that doesn't require a rebalance) and if we get incremental rebalancing in at some point then it even becomes fairly likely that the majority of the state is stale.
I didn't figure out what was going on until I read through the rest of the PR and found that the ClusterConfigState was also now doing the config translation, so I think it's probably not obvious and some comments would help remind people (and avoid, e.g., incorrect refactoring "cleanup" that appears to simplify things but actually breaks them).
| } | ||
|
|
||
| @Override | ||
| public ConfigReloadAction getConnectorConfigReloadAction(final String connName) { |
There was a problem hiding this comment.
nit: this is identical between both herders, can we refactor configState so we can just push this into the abstract class?
There was a problem hiding this comment.
I'll make a note to clean this later. Probably a lot of methods that use configState can be pulled up into AbstractHerder
|
|
||
| // Configure the ConfigProvider | ||
| String configPrefix = providerPrefix + ".param."; | ||
| Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); |
There was a problem hiding this comment.
nit: probably just copy/paste issue, but it's a configProviderConfig, not converterConfig
| @Override | ||
| public synchronized void stop() { | ||
| log.info("Herder stopping"); | ||
| requestExecutorService.shutdown(); |
There was a problem hiding this comment.
aren't we missing a corresponding awaitTermination?
There was a problem hiding this comment.
I've added an awaitTermination of 30 secs
|
Thanks for the review @ewencp ! I've pushed another commit with changes based on your suggestions. |
|
seems like unrelated core kafka tests failing, but there's a lot of them, so retest this please. likely a fluke, but let's be sure. |
ewencp
left a comment
There was a problem hiding this comment.
had a couple more comments, but we can follow up on them separately. this LGTM in current state. going to wait on tests to come back clean, but once that happens I'll merge to trunk for 2.0.0
| keyValuesByPath.put(path, data); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
lol, i expect this to be a ripe area for bikeshedding. you can definitely reduce some verbosity with streams, unclear that it's actually simplified. for the time being, i think we should be liberal in accepting either form of style and we can figure out where we want to land over time.
| Map<String, String> data = new HashMap<>(configs); | ||
| for (Map.Entry<String, String> config : configs.entrySet()) { | ||
| data.put(config.getKey(), replace(lookupsByProvider, config.getValue(), DEFAULT_PATTERN)); | ||
| } |
There was a problem hiding this comment.
This is actually a really good example of a case where your better version is actually worse for a lot of people -- replaceAll doesn't necessarily have obvious semantics. The forEach version is at least obvious in what it is doing, but I have to read the code in the replaceAll version to figure out that it is only replacing the values and the keys remain the same (and that may have required comparing different versions of the code, I'm not even sure it would have been obvious to me without the earlier versions). It takes both key and value, but I have to carefully inspect what it is returning to understand that it's only updating the value.
Not going to get stuck in a language features discussion for this PR, but you have to be a Java person to be able to read your last version and very quickly understand what's happening. Both the original version and your first alternative actually seem way better to me because coming from any background I can read them and understand what the semantics are.
| import java.util.Set; | ||
|
|
||
| /** | ||
| * An implementation of {@link ConfigProvider} that represents a Properties file. |
There was a problem hiding this comment.
Let's take this to a follow up that clarifies the details of this. We should probably be very verbose about this being an example and you want to be careful about how you use it if you choose too. It's a valid choice in quite a few contexts, but some warnings are probably in order.
@rayokota Maybe file a follow up JIRA so we don't lose track of it? Should be a pretty minor change.
|
Merging to trunk. The failed test is a flaky core test, the connect stuff is all passing fine. |
This commit allows secrets in Connect configs to be externalized and replaced with variable references of the form `${provider:[path:]key}`, where the "path" is optional.
There are 2 main additions to `org.apache.kafka.common.config`: a `ConfigProvider` and a `ConfigTransformer`. The `ConfigProvider` is an interface that allows key-value pairs to be provided by an external source for a given "path". An a TTL can be associated with the key-value pairs returned from the path. The `ConfigTransformer` will use instances of `ConfigProvider` to replace variable references in a set of configuration values.
In the Connect framework, `ConfigProvider` classes can be specified in the worker config, and then variable references can be used in the connector config. In addition, the herder can be configured to restart connectors (or not) based on the TTL returned from a `ConfigProvider`. The main class that performs restarts and transformations is `WorkerConfigTransformer`.
Finally, a `configs()` method has been added to both `SourceTaskContext` and `SinkTaskContext`. This allows connectors to get configs with variables replaced by the latest values from instances of `ConfigProvider`.
Most of the other changes in the Connect framework are threading various objects through classes to enable the above functionality.
Author: Robert Yokota <rayokota@gmail.com>
Author: Ewen Cheslack-Postava <me@ewencp.org>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes apache#5068 from rayokota/KAFKA-6886-connect-secrets
|
Hi @rayokota If not, I can plan to submit a proposal for same. |
|
@aakashgupta96 yes it was merged |
This commit allows secrets in Connect configs to be externalized and replaced with variable references of the form
${provider:[path:]key}, where the "path" is optional.There are 2 main additions to
org.apache.kafka.common.config: aConfigProviderand aConfigTransformer. TheConfigProvideris an interface that allows key-value pairs to be provided by an external source for a given "path". An a TTL can be associated with the key-value pairs returned from the path. TheConfigTransformerwill use instances ofConfigProviderto replace variable references in a set of configuration values.In the Connect framework,
ConfigProviderclasses can be specified in the worker config, and then variable references can be used in the connector config. In addition, the herder can be configured to restart connectors (or not) based on the TTL returned from aConfigProvider. The main class that performs restarts and transformations isWorkerConfigTransformer.Finally, a
configs()method has been added to bothSourceTaskContextandSinkTaskContext. This allows connectors to get configs with variables replaced by the latest values from instances ofConfigProvider.Most of the other changes in the Connect framework are threading various objects through classes to enable the above functionality.
Committer Checklist (excluded from commit message)