Skip to content

KAFKA-6728: Corrected the worker’s instantiation of the HeaderConverter#4815

Closed
rhauch wants to merge 4 commits into
apache:trunkfrom
rhauch:kafka-6728
Closed

KAFKA-6728: Corrected the worker’s instantiation of the HeaderConverter#4815
rhauch wants to merge 4 commits into
apache:trunkfrom
rhauch:kafka-6728

Conversation

@rhauch

@rhauch rhauch commented Apr 3, 2018

Copy link
Copy Markdown
Contributor

Summary of the problem

When the header.converter is not specified in the worker config or the connector config, a bug in the Plugins test causes it to never instantiate the HeaderConverter instance, even though there is a default value.

This is a problem as soon as the connector deals with headers, either in records created by a source connector or in messages on the Kafka topics consumed by a sink connector. As soon as that happens, a NPE occurs.

A workaround is to explicitly set the header.converter configuration property, but this was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible and will require this configuration change.

The Changes

The Plugins.newHeaderConverter methods were always returning null if the header.converter configuration value was not specified in the supplied connector or worker configuration. Thus, even though the header.converter property has a default, it was never being used.

The fix was to only check whether a header.converter property was specified when the connector configuration was being used, and if no such property exists in the connector configuration to return null. Then, when the worker configuration is being used, the method simply gets the header.converter value (or the default if no value was explicitly set).

Also, the ConnectorConfig had the same default value for the header.converter property as the WorkerConfig, but this resulted in very confusing log messages that implied the default header converter should be used even when the worker config specified the header.converter value. By removing the default, the log messages now make sense, and the Worker still properly instantiates the correct header converter.

Finally, updated comments and added log messages to make it more clear which converters are being used and how they are being converted.

Testing

Several new unit tests for Plugins.newHeaderConverter were added to check the various behavior. Additionally, the runtime JAR with these changes was built and inserted into an AK 1.1 installation, and a source connector was manually tested with 8 different combinations of settings for the header.converter configuration:

  1. default value
  2. worker configuration has header.converter explicitly set to the default
  3. worker configuration has header.converter set to a custom HeaderConverter implementation in the same plugin
  4. worker configuration has header.converter set to a custom HeaderConverter implementation in a different plugin
  5. connector configuration has header.converter explicitly set to the default
  6. connector configuration has header.converter set to a custom HeaderConverter implementation in the same plugin
  7. connector configuration has header.converter set to a custom HeaderConverter implementation in a different plugin
  8. worker configuration has header.converter explicitly set to the default, and the connector configuration has header.converter set to a custom HeaderConverter implementation in a different plugin

The worker created the correct HeaderConverter implementation with the correct configuration in all of these tests.

Finally, the default configuration was used with the aforementioned custom source connector that generated records with headers, and an S3 connector that consumes the records with headers (but didn't do anything with them). This test also passed.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

rhauch added 3 commits April 2, 2018 23:26
When the `header.converter` is not specified in the worker config or the connector config, a bug in the `Plugins` test causes it to never instantiate the `HeaderConverter` instance, even though there is a default value.

This is a problem as soon as the connector deals with headers, either in records created by a source connector or in messages on the Kafka topics consumed by a sink connector. As soon as that happens, a NPE occurs.

A workaround is to explicitly set the `header.converter` configuration property, but this was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible and will require this configuration change.
…r header converter

The ConnectorConfig had the same default value for the `header.converter` property as the WorkerConfig, but this resulted in very confusing log messages that implied the default header converter should be used even when the worker config specified the `header.converter` value. By removing the default, the log messages now make sense, and the Worker still properly instantiates the correct header converter.

Also updated comments and added log messages to make it more clear which converters are being used and how they are being converted.
@rhauch rhauch changed the base branch from 1.1 to trunk April 3, 2018 04:28
@rhauch rhauch changed the title [WIP] KAFKA-6728: Corrected the worker’s instantiation of the HeaderConverter KAFKA-6728: Corrected the worker’s instantiation of the HeaderConverter Apr 3, 2018
@rhauch

rhauch commented Apr 3, 2018

Copy link
Copy Markdown
Contributor Author

@ewencp Please review. This will need to be merged onto the 1.1 branch as well.

@ewencp ewencp left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rhauch a few comments inline, I think the only "blockers" were log ordering things.

I think it's also worth calling out that the same check exists for newConverter in addition to the newHeaderConverter case. That happens to work currently because you are required to provide those configs. I wonder if (even if in follow up) we should make the same change for that path.

keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
} else {
log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, given we're even a little unsure exactly the right way for this to work, this logging should at least help us explain to users even if the behavior isn't entirely obvious.

HeaderConverter plugin = null;
switch (classLoaderUsage) {
case CURRENT_CLASSLOADER:
if (!config.originals().containsKey(classPropertyName)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want some commentary here, or in the javadoc, about how this works. the CURRENT_CLASSLOADER name isn't really clear within this limited context to understand why it makes sense to skip in this case (connector overrides probably only make sense if it's explicit in the connector config) since the CURRENT_CLASSLOADER doesn't make it clear that it is the connector's classloader.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, now I am wondering if we should rename the enums to be clearer. We can follow up on that separately, but I realize these branches aren't really very clear currently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about doing that now. If you're okay with waiting, I can file a separate issue for later.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's just about internal code being readable and understandable, not critical to current issue.

String configPrefix = classPropertyName + ".";
Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix);
plugin.configure(converterConfig, isKeyConverter);
log.debug("Configuring the {} converter with configuration:{}{}",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be above the previous line? Order of output could be confusing given current phrasing. Might also want to include isKeyConverter in the log.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with moving. isKeyConverter is used in the first parameter in the log statement.

Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix);
converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName());
plugin.configure(converterConfig);
log.debug("Configuring the header converter with configuration:{}{}", System.lineSeparator(), converterConfig);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as above about moving this above the call to configure

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. Moving it before call to configure.

public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header converter class";
public static final String HEADER_CONVERTER_CLASS_DEFAULT = WorkerConfig.HEADER_CONVERTER_CLASS_DEFAULT;
public static final String HEADER_CONVERTER_CLASS_DEFAULT = null;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might want a comment explaining the difference here, it'd be easy to interpret it as a mistaken deviation from WorkerConfig and "fix" it by restoring the inherited default.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

@rhauch

rhauch commented Apr 3, 2018

Copy link
Copy Markdown
Contributor Author

@ewencp I really wanted to make those changes to newConverter, but didn't due to timing. I will follow up with another issue.

@rhauch

rhauch commented Apr 3, 2018

Copy link
Copy Markdown
Contributor Author

Logged the additional fixes/clarification as https://issues.apache.org/jira/browse/KAFKA-6740.

@ewencp ewencp left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, merging to trunk and 1.1

ewencp pushed a commit that referenced this pull request Apr 3, 2018
## Summary of the problem
When the `header.converter` is not specified in the worker config or the connector config, a bug in the `Plugins` test causes it to never instantiate the `HeaderConverter` instance, even though there is a default value.

This is a problem as soon as the connector deals with headers, either in records created by a source connector or in messages on the Kafka topics consumed by a sink connector. As soon as that happens, a NPE occurs.

A workaround is to explicitly set the `header.converter` configuration property, but this was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible and will require this configuration change.

## The Changes

The `Plugins.newHeaderConverter` methods were always returning null if the `header.converter` configuration value was not specified in the supplied connector or worker configuration. Thus, even though the `header.converter` property has a default, it was never being used.

The fix was to only check whether a `header.converter` property was specified when the connector configuration was being used, and if no such property exists in the connector configuration to return null. Then, when the worker configuration is being used, the method simply gets the `header.converter` value (or the default if no value was explicitly set).

Also, the ConnectorConfig had the same default value for the `header.converter` property as the WorkerConfig, but this resulted in very confusing log messages that implied the default header converter should be used even when the worker config specified the `header.converter` value. By removing the default, the log messages now make sense, and the Worker still properly instantiates the correct header converter.

Finally, updated comments and added log messages to make it more clear which converters are being used and how they are being converted.

## Testing

Several new unit tests for `Plugins.newHeaderConverter` were added to check the various behavior. Additionally, the runtime JAR with these changes was built and inserted into an AK 1.1 installation, and a source connector was manually tested with 8 different combinations of settings for the `header.converter` configuration:

1. default value
1. worker configuration has `header.converter` explicitly set to the default
1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin
1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
1. connector configuration has `header.converter` explicitly set to the default
1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin
1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
1. worker configuration has `header.converter` explicitly set to the default, and the connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin

The worker created the correct `HeaderConverter` implementation with the correct configuration in all of these tests.

Finally, the default configuration was used with the aforementioned custom source connector that generated records with headers, and an S3 connector that consumes the records with headers (but didn't do anything with them). This test also passed.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #4815 from rhauch/kafka-6728

(cherry picked from commit d9369de)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
@ewencp ewencp closed this in d9369de Apr 3, 2018
rhauch added a commit to confluentinc/kafka that referenced this pull request Apr 4, 2018
## Summary of the problem
When the `header.converter` is not specified in the worker config or the connector config, a bug in the `Plugins` test causes it to never instantiate the `HeaderConverter` instance, even though there is a default value.

This is a problem as soon as the connector deals with headers, either in records created by a source connector or in messages on the Kafka topics consumed by a sink connector. As soon as that happens, a NPE occurs.

A workaround is to explicitly set the `header.converter` configuration property, but this was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible and will require this configuration change.

## The Changes

The `Plugins.newHeaderConverter` methods were always returning null if the `header.converter` configuration value was not specified in the supplied connector or worker configuration. Thus, even though the `header.converter` property has a default, it was never being used.

The fix was to only check whether a `header.converter` property was specified when the connector configuration was being used, and if no such property exists in the connector configuration to return null. Then, when the worker configuration is being used, the method simply gets the `header.converter` value (or the default if no value was explicitly set).

Also, the ConnectorConfig had the same default value for the `header.converter` property as the WorkerConfig, but this resulted in very confusing log messages that implied the default header converter should be used even when the worker config specified the `header.converter` value. By removing the default, the log messages now make sense, and the Worker still properly instantiates the correct header converter.

Finally, updated comments and added log messages to make it more clear which converters are being used and how they are being converted.

## Testing

Several new unit tests for `Plugins.newHeaderConverter` were added to check the various behavior. Additionally, the runtime JAR with these changes was built and inserted into an AK 1.1 installation, and a source connector was manually tested with 8 different combinations of settings for the `header.converter` configuration:

1. default value
1. worker configuration has `header.converter` explicitly set to the default
1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin
1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
1. connector configuration has `header.converter` explicitly set to the default
1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin
1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
1. worker configuration has `header.converter` explicitly set to the default, and the connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin

The worker created the correct `HeaderConverter` implementation with the correct configuration in all of these tests.

Finally, the default configuration was used with the aforementioned custom source connector that generated records with headers, and an S3 connector that consumes the records with headers (but didn't do anything with them). This test also passed.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#4815 from rhauch/kafka-6728

(cherry picked from commit d9369de)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants