KAFKA-6728: Corrected the worker’s instantiation of the HeaderConverter#4815
KAFKA-6728: Corrected the worker’s instantiation of the HeaderConverter#4815rhauch wants to merge 4 commits into
Conversation
When the `header.converter` is not specified in the worker config or the connector config, a bug in the `Plugins` test causes it to never instantiate the `HeaderConverter` instance, even though there is a default value. This is a problem as soon as the connector deals with headers, either in records created by a source connector or in messages on the Kafka topics consumed by a sink connector. As soon as that happens, a NPE occurs. A workaround is to explicitly set the `header.converter` configuration property, but this was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible and will require this configuration change.
…r header converter The ConnectorConfig had the same default value for the `header.converter` property as the WorkerConfig, but this resulted in very confusing log messages that implied the default header converter should be used even when the worker config specified the `header.converter` value. By removing the default, the log messages now make sense, and the Worker still properly instantiates the correct header converter. Also updated comments and added log messages to make it more clear which converters are being used and how they are being converted.
|
@ewencp Please review. This will need to be merged onto the 1.1 branch as well. |
ewencp
left a comment
There was a problem hiding this comment.
@rhauch a few comments inline, I think the only "blockers" were log ordering things.
I think it's also worth calling out that the same check exists for newConverter in addition to the newHeaderConverter case. That happens to work currently because you are required to provide those configs. I wonder if (even if in follow up) we should make the same change for that path.
| keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); | ||
| log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id); | ||
| } else { | ||
| log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id); |
There was a problem hiding this comment.
Nice, given we're even a little unsure exactly the right way for this to work, this logging should at least help us explain to users even if the behavior isn't entirely obvious.
| HeaderConverter plugin = null; | ||
| switch (classLoaderUsage) { | ||
| case CURRENT_CLASSLOADER: | ||
| if (!config.originals().containsKey(classPropertyName)) { |
There was a problem hiding this comment.
might want some commentary here, or in the javadoc, about how this works. the CURRENT_CLASSLOADER name isn't really clear within this limited context to understand why it makes sense to skip in this case (connector overrides probably only make sense if it's explicit in the connector config) since the CURRENT_CLASSLOADER doesn't make it clear that it is the connector's classloader.
There was a problem hiding this comment.
In fact, now I am wondering if we should rename the enums to be clearer. We can follow up on that separately, but I realize these branches aren't really very clear currently.
There was a problem hiding this comment.
I'm concerned about doing that now. If you're okay with waiting, I can file a separate issue for later.
There was a problem hiding this comment.
yeah, that's just about internal code being readable and understandable, not critical to current issue.
| String configPrefix = classPropertyName + "."; | ||
| Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); | ||
| plugin.configure(converterConfig, isKeyConverter); | ||
| log.debug("Configuring the {} converter with configuration:{}{}", |
There was a problem hiding this comment.
I think this should be above the previous line? Order of output could be confusing given current phrasing. Might also want to include isKeyConverter in the log.
There was a problem hiding this comment.
Agree with moving. isKeyConverter is used in the first parameter in the log statement.
| Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); | ||
| converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()); | ||
| plugin.configure(converterConfig); | ||
| log.debug("Configuring the header converter with configuration:{}{}", System.lineSeparator(), converterConfig); |
There was a problem hiding this comment.
same question as above about moving this above the call to configure
There was a problem hiding this comment.
agree. Moving it before call to configure.
| public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC; | ||
| public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header converter class"; | ||
| public static final String HEADER_CONVERTER_CLASS_DEFAULT = WorkerConfig.HEADER_CONVERTER_CLASS_DEFAULT; | ||
| public static final String HEADER_CONVERTER_CLASS_DEFAULT = null; |
There was a problem hiding this comment.
you might want a comment explaining the difference here, it'd be easy to interpret it as a mistaken deviation from WorkerConfig and "fix" it by restoring the inherited default.
|
@ewencp I really wanted to make those changes to |
|
Logged the additional fixes/clarification as https://issues.apache.org/jira/browse/KAFKA-6740. |
ewencp
left a comment
There was a problem hiding this comment.
LGTM, merging to trunk and 1.1
## Summary of the problem When the `header.converter` is not specified in the worker config or the connector config, a bug in the `Plugins` test causes it to never instantiate the `HeaderConverter` instance, even though there is a default value. This is a problem as soon as the connector deals with headers, either in records created by a source connector or in messages on the Kafka topics consumed by a sink connector. As soon as that happens, a NPE occurs. A workaround is to explicitly set the `header.converter` configuration property, but this was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible and will require this configuration change. ## The Changes The `Plugins.newHeaderConverter` methods were always returning null if the `header.converter` configuration value was not specified in the supplied connector or worker configuration. Thus, even though the `header.converter` property has a default, it was never being used. The fix was to only check whether a `header.converter` property was specified when the connector configuration was being used, and if no such property exists in the connector configuration to return null. Then, when the worker configuration is being used, the method simply gets the `header.converter` value (or the default if no value was explicitly set). Also, the ConnectorConfig had the same default value for the `header.converter` property as the WorkerConfig, but this resulted in very confusing log messages that implied the default header converter should be used even when the worker config specified the `header.converter` value. By removing the default, the log messages now make sense, and the Worker still properly instantiates the correct header converter. Finally, updated comments and added log messages to make it more clear which converters are being used and how they are being converted. ## Testing Several new unit tests for `Plugins.newHeaderConverter` were added to check the various behavior. Additionally, the runtime JAR with these changes was built and inserted into an AK 1.1 installation, and a source connector was manually tested with 8 different combinations of settings for the `header.converter` configuration: 1. default value 1. worker configuration has `header.converter` explicitly set to the default 1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin 1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin 1. connector configuration has `header.converter` explicitly set to the default 1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin 1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin 1. worker configuration has `header.converter` explicitly set to the default, and the connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin The worker created the correct `HeaderConverter` implementation with the correct configuration in all of these tests. Finally, the default configuration was used with the aforementioned custom source connector that generated records with headers, and an S3 connector that consumes the records with headers (but didn't do anything with them). This test also passed. Author: Randall Hauch <rhauch@gmail.com> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes #4815 from rhauch/kafka-6728 (cherry picked from commit d9369de) Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
## Summary of the problem When the `header.converter` is not specified in the worker config or the connector config, a bug in the `Plugins` test causes it to never instantiate the `HeaderConverter` instance, even though there is a default value. This is a problem as soon as the connector deals with headers, either in records created by a source connector or in messages on the Kafka topics consumed by a sink connector. As soon as that happens, a NPE occurs. A workaround is to explicitly set the `header.converter` configuration property, but this was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible and will require this configuration change. ## The Changes The `Plugins.newHeaderConverter` methods were always returning null if the `header.converter` configuration value was not specified in the supplied connector or worker configuration. Thus, even though the `header.converter` property has a default, it was never being used. The fix was to only check whether a `header.converter` property was specified when the connector configuration was being used, and if no such property exists in the connector configuration to return null. Then, when the worker configuration is being used, the method simply gets the `header.converter` value (or the default if no value was explicitly set). Also, the ConnectorConfig had the same default value for the `header.converter` property as the WorkerConfig, but this resulted in very confusing log messages that implied the default header converter should be used even when the worker config specified the `header.converter` value. By removing the default, the log messages now make sense, and the Worker still properly instantiates the correct header converter. Finally, updated comments and added log messages to make it more clear which converters are being used and how they are being converted. ## Testing Several new unit tests for `Plugins.newHeaderConverter` were added to check the various behavior. Additionally, the runtime JAR with these changes was built and inserted into an AK 1.1 installation, and a source connector was manually tested with 8 different combinations of settings for the `header.converter` configuration: 1. default value 1. worker configuration has `header.converter` explicitly set to the default 1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin 1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin 1. connector configuration has `header.converter` explicitly set to the default 1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin 1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin 1. worker configuration has `header.converter` explicitly set to the default, and the connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin The worker created the correct `HeaderConverter` implementation with the correct configuration in all of these tests. Finally, the default configuration was used with the aforementioned custom source connector that generated records with headers, and an S3 connector that consumes the records with headers (but didn't do anything with them). This test also passed. Author: Randall Hauch <rhauch@gmail.com> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes apache#4815 from rhauch/kafka-6728 (cherry picked from commit d9369de) Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
Summary of the problem
When the
header.converteris not specified in the worker config or the connector config, a bug in thePluginstest causes it to never instantiate theHeaderConverterinstance, even though there is a default value.This is a problem as soon as the connector deals with headers, either in records created by a source connector or in messages on the Kafka topics consumed by a sink connector. As soon as that happens, a NPE occurs.
A workaround is to explicitly set the
header.converterconfiguration property, but this was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible and will require this configuration change.The Changes
The
Plugins.newHeaderConvertermethods were always returning null if theheader.converterconfiguration value was not specified in the supplied connector or worker configuration. Thus, even though theheader.converterproperty has a default, it was never being used.The fix was to only check whether a
header.converterproperty was specified when the connector configuration was being used, and if no such property exists in the connector configuration to return null. Then, when the worker configuration is being used, the method simply gets theheader.convertervalue (or the default if no value was explicitly set).Also, the ConnectorConfig had the same default value for the
header.converterproperty as the WorkerConfig, but this resulted in very confusing log messages that implied the default header converter should be used even when the worker config specified theheader.convertervalue. By removing the default, the log messages now make sense, and the Worker still properly instantiates the correct header converter.Finally, updated comments and added log messages to make it more clear which converters are being used and how they are being converted.
Testing
Several new unit tests for
Plugins.newHeaderConverterwere added to check the various behavior. Additionally, the runtime JAR with these changes was built and inserted into an AK 1.1 installation, and a source connector was manually tested with 8 different combinations of settings for theheader.converterconfiguration:header.converterexplicitly set to the defaultheader.converterset to a customHeaderConverterimplementation in the same pluginheader.converterset to a customHeaderConverterimplementation in a different pluginheader.converterexplicitly set to the defaultheader.converterset to a customHeaderConverterimplementation in the same pluginheader.converterset to a customHeaderConverterimplementation in a different pluginheader.converterexplicitly set to the default, and the connector configuration hasheader.converterset to a customHeaderConverterimplementation in a different pluginThe worker created the correct
HeaderConverterimplementation with the correct configuration in all of these tests.Finally, the default configuration was used with the aforementioned custom source connector that generated records with headers, and an S3 connector that consumes the records with headers (but didn't do anything with them). This test also passed.
Committer Checklist (excluded from commit message)