Skip to content

KAFKA-6886 Externalize secrets from Connect configs#5068

Closed
rayokota wants to merge 12 commits into
apache:trunkfrom
rayokota:KAFKA-6886-connect-secrets
Closed

KAFKA-6886 Externalize secrets from Connect configs#5068
rayokota wants to merge 12 commits into
apache:trunkfrom
rayokota:KAFKA-6886-connect-secrets

Conversation

@rayokota

@rayokota rayokota commented May 23, 2018

Copy link
Copy Markdown
Contributor

This commit allows secrets in Connect configs to be externalized and replaced with variable references of the form ${provider:[path:]key}, where the "path" is optional.

There are 2 main additions to org.apache.kafka.common.config: a ConfigProvider and a ConfigTransformer. The ConfigProvider is an interface that allows key-value pairs to be provided by an external source for a given "path". An a TTL can be associated with the key-value pairs returned from the path. The ConfigTransformer will use instances of ConfigProvider to replace variable references in a set of configuration values.

In the Connect framework, ConfigProvider classes can be specified in the worker config, and then variable references can be used in the connector config. In addition, the herder can be configured to restart connectors (or not) based on the TTL returned from a ConfigProvider. The main class that performs restarts and transformations is WorkerConfigTransformer.

Finally, a configs() method has been added to both SourceTaskContext and SinkTaskContext. This allows connectors to get configs with variables replaced by the latest values from instances of ConfigProvider.

Most of the other changes in the Connect framework are threading various objects through classes to enable the above functionality.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@rayokota rayokota force-pushed the KAFKA-6886-connect-secrets branch from f62199a to aa3eb04 Compare May 23, 2018 00:14
@rayokota rayokota force-pushed the KAFKA-6886-connect-secrets branch from be52a09 to 15d2532 Compare May 23, 2018 05:02
@rayokota

Copy link
Copy Markdown
Contributor Author

retest this please

@rhauch rhauch left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good, @rayokota. I did a pretty detailed pass, and have a number of comments.

* @param path the path where the data resides
* @param keys the keys whose values will be retrieved
*/
void unsubscribe(String path, Set<String> keys);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one subscriber is expected? If not, does this unsubscribe all subscriptions for the keys at the path?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a default implementation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add the callback to this method.

*
* @param configProviders the set of {@link ConfigProvider} instances.
*/
public ConfigTransformer(Map<String, ConfigProvider> configProviders) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about providing a builder API to easily create these with individual named providers? Not sure how valuable it will be.

ConfigProvider provider = configProviders.get(providerName);
Map<String, Set<String>> keysByPath = entry.getValue();
if (provider != null && keysByPath != null) {
for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps?

keysByPath.entrySet().forEach(entry-> {

Set<String> keys = keysByPath.computeIfAbsent(var.path, k -> new HashSet<>());
keys.add(var.variable);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Java 8 streams would be a bit more compact and IMO more readable:

configs.forEach((key, value)->{
    getVars(key, value, DEFAULT_PATTERN).forEach(var-> {
        keysByProvider.computeIfAbsent(var.providerName, k -> new HashMap<>())
                                  .computeIfAbsent(var.path, k-> new HashSet<>())
                                  .add(var.variable);
    });
});

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keyValuesByPath.put(path, data);
}
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this could be simplified quite a bit with streams.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, i expect this to be a ripe area for bikeshedding. you can definitely reduce some verbosity with streams, unclear that it's actually simplified. for the time being, i think we should be liberal in accepting either form of style and we can figure out where we want to land over time.

if (keys.contains("testKey")) {
data.put("testKey", "testResult");
}
if (keys.contains("testKeyWithTTL")) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to use constants for these keys and values. That would be a bit more maintainable, as long as it doesn't make it unreadable.

* @param keys the keys whose values will be retrieved
* @param callback the callback to invoke upon change
*/
void subscribe(String path, Set<String> keys, ConfigChangeCallback callback);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a default implementation?

public interface SinkTaskContext {

/**
* Get the Task configuration.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need more JavaDoc to define the behavior. Does this return null? Will the result ever differ if called multiple times? How does this relate to the configuration passed to the connector and/or task upon startup?

*/
public interface SourceTaskContext {
/**
* Get the Task configuration.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

getPluginDesc(reflections, HeaderConverter.class, loader),
getPluginDesc(reflections, Transformation.class, loader)
getPluginDesc(reflections, Transformation.class, loader),
getPluginDesc(reflections, ConfigProvider.class, loader)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to scan for this, but want to use the Service Loader mechanism. Do you plan to change this when the REST extension PR is merged?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that will be for a future PR

@rhauch rhauch left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments.


public static final String CONFIG_RELOAD_ACTION_CONFIG = "config.action.reload";
private static final String CONFIG_RELOAD_ACTION_DOC =
"The action to take in order to reload changed configuration values (none or restart).";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably make more clear that this occurs when configuration values change in external providers, and should describe what none and restart mean. Users shouldn't have to go to the code or KIP to understand this configuration property.

*/
package org.apache.kafka.connect.runtime;

public interface HerderRequestId {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an odd name for the response from a herder request: there's no "id" associated with this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed to just HerderRequest

+ "/opt/connectors";

public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
protected static final String CONFIG_PROVIDERS_DOC = "List of configuration providers.";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please be more specific. "List of configuration providers. This is a comma-separated list of the fully-qualified names of the ConfigProvider implementations, in the order they will be created, configured, and used."

}

@Override
public String getConnectorConfigReloadAction(final String connName) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have an enum for reload action?

@rayokota

Copy link
Copy Markdown
Contributor Author

@rhauch , thanks for the review! I've incorporated most of your suggestions (except the use of foreach :)). For the subscribe and unsubscribe, those are optional methods for ConfigProvider subclasses that want to maintain a separate thread to poll the external source, and then push out changes. I've copied how optional operations are documented in java.util.List by adding JavaDoc that they are optional and could possible throw UnsupportedOperationException. I also gave ConfigProvider default implementations that throw UnsupportedOperationException. Thanks!

@rhauch rhauch left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, one more round of mostly questions and a few minor suggestions.

}

public void subscribe(String path, Set<String> keys, ConfigChangeCallback callback) {
throw new UnsupportedOperationException();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the default implementation?

}

public void unsubscribe(String path, Set<String> keys) {
throw new UnsupportedOperationException();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the default implementation?

* </pre>
* resides at the path "/tmp/properties.txt", then when a configuration Map which has an entry with a key "someKey" and
* a value "${file:/tmp/properties.txt:fileKey}" is passed to the {@link #transform(Map)} method, then the transformed
* Map will have an entry with key "someKey" and a value "someValue".

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be useful to mention that this only requires the get methods on ConfigProvider, and does not use subscribe.


/**
* Returns the transformed data, with variables replaced with corresponding values from the
* ConfigProvider instances if found.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to mention that changes to this map will not affect the origin of the configuration or the information in the ConfigProvider(s)?

import java.util.Set;

/**
* An implementation of {@link ConfigProvider} that represents a Properties file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should mention that all information stored in the file are stored in cleartext, so care must be used if any secrets are stored in the backing file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take this to a follow up that clarifies the details of this. We should probably be very verbose about this being an example and you want to be careful about how you use it if you choose too. It's a valid choice in quite a few contexts, but some warnings are probably in order.

@rayokota Maybe file a follow up JIRA so we don't lose track of it? Should be a pretty minor change.

"The action to take in order to reload changed configuration values (none or restart). "
+ "Configuration values will typically only change if using external configuration providers. "
+ "A value of 'none' indicates that the new values will not be reloaded. "
+ "A value of 'reload' indicates that the new values will be reloaded by restarting the connector. "

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that a user will know what this means. How about making it much more specific and concrete:

The action that Connect should take on the connector when changes in external 
configuration providers results in a change in the connector's configuration properties. 
A value of `none` indicates that Connect will do nothing. 
A value of 'reload' indicates that Connect should restart/reload the connector with the 
updated configuration properties....

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is this restart or reload? The enum is RESTART.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is this case sensitive?

if (configTransformer != null) {
configs = configTransformer.transform(connector, configs);
}
return configs;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JavaDoc for this method should probably say that it includes the current transformed connector configuration where all variables have been replaced with the current values from the external ConfigProviders, and that they may include secrets.

if (configTransformer != null) {
configs = configTransformer.transform(task.connector(), configs);
}
return configs;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JavaDoc for this method should probably say that it includes the current transformed connector configuration where all variables have been replaced with the current values from the external ConfigProviders, and that they may include secrets.

+ "indicates that a configuration value will expire in the future.";
"The action that Connect should take on the connector when changes in external " +
"configuration providers result in a change in the connector's configuration properties. " +
"A value of `none` indicates that Connect will do nothing. " +

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backtick was from my original comment. 😊 Should change that to single quote like on the next line.

@rhauch rhauch left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more minor thing: some JavaDoc lines were accidentally removed.

* Get the configuration for a connector. The configuration will have been transformed by
* {@link org.apache.kafka.common.config.ConfigTransformer} by having all variable
* references replaced with the current values from external instances of
* {@link org.apache.kafka.common.config.ConfigProvider}, and may include secrets.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now missing @param and @return.

@rhauch rhauch left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work, @rayokota! Thanks for your patience with the reviews. I'm very much looking forward to this new capability in Connect.

@rayokota

Copy link
Copy Markdown
Contributor Author

Thanks, @rhauch , appreciate your attention to detail :)

@ewencp ewencp left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rayokota A small pile of minor comments, but this mostly looks good. The one issue that would be most important to fix before merging is the one about awaitTermination.

* @param data a Map of key-value pairs
*/
public ConfigData(Map<String, String> data) {
this(data, Long.MAX_VALUE);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually want this to be Long.MAX_VALUE? Seems error prone since the normal thing to do with the TTL is add it to the current time and that will cause overflow. Should we have some sentinel value for infinite TTL?

ConfigData configData = provider.get(path, keys);
Map<String, String> data = configData.data();
long ttl = configData.ttl();
if (ttl >= 0 && ttl < Long.MAX_VALUE) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a constant in the ConfigData class for whatever value ends up being the sentinel, even if it ends up being Long.MAX_VALUE so that uses of the constant outside that class are clearer as to their meaning. Also, another alternative would be to track the TTL as a Long instead of a long, which is less user friendly but doesn't require a special sentinel value.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll change to Long

StatusBackingStore statusBackingStore,
ConfigBackingStore configBackingStore) {
this.worker = worker;
this.worker.herder = this;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably clean up or refactor later, but this doesn't seem great -- code like this indicates overly strong coupling between these two classes and makes ownership semantics unclear.

* Get the configuration reload action.
* @param connName name of the connector
*/
ConfigReloadAction getConnectorConfigReloadAction(final String connName);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Kafka generally avoids using get/set at the beginning of accessors.

KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, time);
headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, configState, metrics, loader, time);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to put some comments in the context classes about why we need the entire config. At first glance it appears possibly problematic because we're saving the entire snapshot state for the cluster, but really only care about a single config. It's possible for the state to become outdated (e.g. config change that doesn't require a rebalance) and if we get incremental rebalancing in at some point then it even becomes fairly likely that the majority of the state is stale.

I didn't figure out what was going on until I read through the rest of the PR and found that the ClusterConfigState was also now doing the config translation, so I think it's probably not obvious and some comments would help remind people (and avoid, e.g., incorrect refactoring "cleanup" that appears to simplify things but actually breaks them).

}

@Override
public ConfigReloadAction getConnectorConfigReloadAction(final String connName) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is identical between both herders, can we refactor configState so we can just push this into the abstract class?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make a note to clean this later. Probably a lot of methods that use configState can be pulled up into AbstractHerder


// Configure the ConfigProvider
String configPrefix = providerPrefix + ".param.";
Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably just copy/paste issue, but it's a configProviderConfig, not converterConfig

@Override
public synchronized void stop() {
log.info("Herder stopping");
requestExecutorService.shutdown();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't we missing a corresponding awaitTermination?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added an awaitTermination of 30 secs

@rayokota

Copy link
Copy Markdown
Contributor Author

Thanks for the review @ewencp ! I've pushed another commit with changes based on your suggestions.

@ewencp

ewencp commented May 30, 2018

Copy link
Copy Markdown
Contributor

seems like unrelated core kafka tests failing, but there's a lot of them, so retest this please. likely a fluke, but let's be sure.

@ewencp ewencp left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had a couple more comments, but we can follow up on them separately. this LGTM in current state. going to wait on tests to come back clean, but once that happens I'll merge to trunk for 2.0.0

keyValuesByPath.put(path, data);
}
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, i expect this to be a ripe area for bikeshedding. you can definitely reduce some verbosity with streams, unclear that it's actually simplified. for the time being, i think we should be liberal in accepting either form of style and we can figure out where we want to land over time.

Map<String, String> data = new HashMap<>(configs);
for (Map.Entry<String, String> config : configs.entrySet()) {
data.put(config.getKey(), replace(lookupsByProvider, config.getValue(), DEFAULT_PATTERN));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a really good example of a case where your better version is actually worse for a lot of people -- replaceAll doesn't necessarily have obvious semantics. The forEach version is at least obvious in what it is doing, but I have to read the code in the replaceAll version to figure out that it is only replacing the values and the keys remain the same (and that may have required comparing different versions of the code, I'm not even sure it would have been obvious to me without the earlier versions). It takes both key and value, but I have to carefully inspect what it is returning to understand that it's only updating the value.

Not going to get stuck in a language features discussion for this PR, but you have to be a Java person to be able to read your last version and very quickly understand what's happening. Both the original version and your first alternative actually seem way better to me because coming from any background I can read them and understand what the semantics are.

import java.util.Set;

/**
* An implementation of {@link ConfigProvider} that represents a Properties file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take this to a follow up that clarifies the details of this. We should probably be very verbose about this being an example and you want to be careful about how you use it if you choose too. It's a valid choice in quite a few contexts, but some warnings are probably in order.

@rayokota Maybe file a follow up JIRA so we don't lose track of it? Should be a pretty minor change.

@ewencp

ewencp commented May 30, 2018

Copy link
Copy Markdown
Contributor

Merging to trunk. The failed test is a flaky core test, the connect stuff is all passing fine.

@ewencp ewencp closed this in 08e8fac May 30, 2018
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
This commit allows secrets in Connect configs to be externalized and replaced with variable references of the form `${provider:[path:]key}`, where the "path" is optional.

There are 2 main additions to `org.apache.kafka.common.config`: a `ConfigProvider` and a `ConfigTransformer`.  The `ConfigProvider` is an interface that allows key-value pairs to be provided by an external source for a given "path".  An a TTL can be associated with the key-value pairs returned from the path.  The `ConfigTransformer` will use instances of `ConfigProvider` to replace variable references in a set of configuration values.

In the Connect framework, `ConfigProvider` classes can be specified in the worker config, and then variable references can be used in the connector config.  In addition, the herder can be configured to restart connectors (or not) based on the TTL returned from a `ConfigProvider`.  The main class that performs restarts and transformations is `WorkerConfigTransformer`.

Finally, a `configs()` method has been added to both `SourceTaskContext` and `SinkTaskContext`.  This allows connectors to get configs with variables replaced by the latest values from instances of `ConfigProvider`.

Most of the other changes in the Connect framework are threading various objects through classes to enable the above functionality.

Author: Robert Yokota <rayokota@gmail.com>
Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#5068 from rayokota/KAFKA-6886-connect-secrets
@aakashgupta96

Copy link
Copy Markdown

Hi @rayokota
Was this feature merged? I was also planning to submit a KIP around same then I saw your proposal.
Can you please update whether it is merged or are you still working on this?

If not, I can plan to submit a proposal for same.

@rayokota

Copy link
Copy Markdown
Contributor Author

@aakashgupta96 yes it was merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants