feature: Add AWS MSK IAM Authentication Support for Kafka Engine#91118
feature: Add AWS MSK IAM Authentication Support for Kafka Engine#91118kalavt wants to merge 2 commits intoClickHouse:masterfrom
Conversation
|
For implement #80446 |
|
@antaljanosbenjamin can you pls have a check on this PR, thanks. |
antaljanosbenjamin
left a comment
There was a problem hiding this comment.
In general looks good. There are a few general comments:
- Could you please also update the regarding docs?
- Have you thought about using "assumed roles" similarly to the S3 table function? I think it is implemented for S3, it shouldn't be hard to enable it here too.
- I have to talk with some security people, but we might need a setting to disable this globally if there is some issue regarding security. Please don't act on this yet, I will try to get a precise answer on that and understand it better.
|
Workflow [PR], commit [3a8f50a] Summary: ❌
|
|
The changes in c-ares are intentional? |
|
@antaljanosbenjamin for your comments:
Regarding the c-ares submodule changes visible in the PR: I can confirm that I have not made any direct modifications to the contrib/c-ares submodule. The changes appearing in the PR diff are due to the submodule version difference between my feature branch and the current master branch. Here's what happened:
updated, or any other docs need update, pls let me know.
Yes, but should be by another PR.
sure. Thanks for checking, |
|
So for the submodules, please revert those changes locally. The issue is in my opinion is the submodules were updated on master, you didn't updated them locally but committed them. This means if we would merge your PR, it would revert the changes in submodules. For the credentials, there are two things. First of all, please use our ClickHouse/src/IO/S3/Credentials.h Lines 211 to 218 in 7d0b74a Second, when creating such provider chain, please make sure If you have any questions, please don't hesitate to ask. |
ab166da to
abacc8d
Compare
a732cbb to
b6d319e
Compare
|
@antaljanosbenjamin for your comments, I've solved the submodules changes, using S3CredentialsProviderChain with use_environment_credentials for loading credentials. thanks for your guidance and point out the security concern. |
0a265e0 to
b1bd1c1
Compare
antaljanosbenjamin
left a comment
There was a problem hiding this comment.
A few small things.
2eb1077 to
e5ee4ac
Compare
cf93f1c to
a193144
Compare
|
I've completely re-design the feature and tested it works, would you please review my code and let me know if you have any concern. Thanks. |
d9fdaf8 to
4ce1766
Compare
|
@antaljanosbenjamin can you pls guide me for next step merge this PR? |
|
I will have a look at it next week on the first day I am working. Sorry for the delay, but I am on vacation. |
| # it does NOT require Cyrus SASL library. By enabling it unconditionally | ||
| # when WITH_SSL is set, we can support AWS MSK IAM authentication and | ||
| # other OAUTHBEARER mechanisms without needing the full Cyrus SASL stack. | ||
| set(WITH_SASL_OAUTHBEARER 1) |
There was a problem hiding this comment.
I have to verify this locally, so this is a note to myself.
| { | ||
| std::lock_guard lock(g_provider_cache.mutex); | ||
| if (!g_provider_cache.provider) | ||
| { | ||
| auto aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration( | ||
| region, {}, 0, {}, false, false, false, false, {}, {}); | ||
|
|
||
| S3::CredentialsConfiguration credentials_configuration; | ||
| credentials_configuration.use_environment_credentials = use_environment_credentials; | ||
|
|
||
| g_provider_cache.provider = std::make_shared<S3::S3CredentialsProviderChain>( | ||
| aws_client_configuration, Aws::Auth::AWSCredentials{}, credentials_configuration); | ||
| } | ||
| provider = g_provider_cache.provider; | ||
| } |
There was a problem hiding this comment.
What happens if the region is different in the current config then it was when the cache was created?
I might miss something, but I don't understand why the storage cannot own the credentials provider. it is tight to a single storage, clear ownership and scope. Now we have a global object which is shared among multiple storages.
There was a problem hiding this comment.
sorry for the late replay, thought you still on vacation :)
You are absolutely right. The global cache was problematic because it didn't account for different regions across multiple storages and introduced unnecessary global state.
I have refactored the implementation to ensure proper scoping and ownership:
- Removed Global Cache:
g_provider_cachehas been removed entirely. - Scoped Ownership: The
S3CredentialsProviderChainis now stored insideOAuthBearerTokenRefreshContext. - Storage Ownership:
StorageKafka(andStorageKafka2) now owns theOAuthBearerTokenRefreshContext(via astd::shared_ptrmember). This ensures the credentials provider's lifecycle is tied directly to the storage instance and uses the correct configuration (region) for that specific table. - Safe Context Passing: I'm passing the context pointer via the configuration string to avoid conflicts with
cppkafka's use of theopaquepointer, ensuring we can access this scoped context safely in the callback.
This design eliminates the risk of region mismatch and removes the global state as suggested.
e64d84a to
b7f3eee
Compare
- Implements AWS MSK IAM authentication for Kafka storage. - Updates librdkafka-cmake configuration.
b7f3eee to
bad8cb1
Compare
antaljanosbenjamin
left a comment
There was a problem hiding this comment.
A few comments, please don't use force push.
| using Base = WriteBufferFromVectorImpl<std::string>; | ||
| public: | ||
| WriteBufferFromOwnString() | ||
| WriteBufferFromOwnString() // NOLINT(modernize-use-equals-default, hicpp-use-equals-default) |
There was a problem hiding this comment.
The constructor cannot use = default because it must initialize the base class WriteBufferFromVectorImpl<std::string> with the value member inherited from StringHolder. A defaulted constructor cannot include member initializers, so the NOLINT suppression is required here.
| String aws_region = kafka_settings[KafkaSetting::kafka_aws_region].value; | ||
|
|
||
| String broker_list = kafka_config.has_property("metadata.broker.list") | ||
| ? kafka_config.get("metadata.broker.list") | ||
| : ""; | ||
|
|
||
| AWSMSKIAMAuth::setupAuthentication( | ||
| kafka_config, params.config, aws_region, broker_list, params.log, storage.getOAuthContext()); |
There was a problem hiding this comment.
Correct me if I am wrong, but this should only happen on the first occasion, no?
So I would say a slightly better logic considering my other comment about not returning a non-const reference to internal members:
- check if the storage has a context or not (returning a non const raw pointer is perfect for this use-case)
- if not, then create one and use the
setOAuthContextfunction to assign it to the storage - if there is already a context, then use that.
There was a problem hiding this comment.
You're correct. The OAuth context initialization is expensive and should be cached. I'll refactor this part.
|
|
||
| // Enable background SASL callbacks for OAUTHBEARER authentication. | ||
| if (auto * error = rd_kafka_sasl_background_callbacks_enable(consumer->get_handle())) | ||
| { | ||
| // If SASL background callbacks cannot be enabled (e.g., not configured or not supported), | ||
| // we still continue since this is not a critical error for basic Kafka functionality | ||
| rd_kafka_error_destroy(error); | ||
| } |
There was a problem hiding this comment.
I think we don't need this callback. My reasoning is if the consumer is not polled, then we don't need to refresh the auth secrets.
There was a problem hiding this comment.
The background SASL callbacks are necessary for handling low-traffic topics where the consumer may be idle for extended periods. Without them, OAuth tokens can expire during idle periods, causing unrecoverable authentication failures when the consumer attempts to poll again. This was specifically implemented to ensure consumers remain operational even on low-volume topics where polls are infrequent.
| // NOTE: We cannot use the global 'opaque' pointer because it is used by cppkafka | ||
| // to store the Consumer/Producer instance. Overwriting it would break cppkafka callbacks. | ||
| // Instead, we pass the context pointer address in the configuration string. |
There was a problem hiding this comment.
I am not sure about this. This feels a very hacky solution. I think the proper solution is to add this functionality to cppkafka first and then we can utilize it here. Passing opaque pointers is fine, passing pointers as strings is not in my opnion. Technically they are the same, but it seems like a code smell. Feel free open a PR to https://github.com/ClickHouse/cppkafka and tag me.
There was a problem hiding this comment.
yah that's a workaround temporary not the ideal solution, for now, should we keep the current implementation as a temporary workaround until the cppkafka enhancement is merged,
or would you prefer to block this PR until the proper solution is in place?
There was a problem hiding this comment.
well, I've proposed a PR to https://github.com/ClickHouse/cppkafka to support OAUTHBEARER token refresh callback.
ClickHouse/cppkafka#5
and I'll refactor the Clickhouse AWS MSK IAM auth implementation to new PR with newly added refresh callback function #96100
| const char * oauthbearer_config, | ||
| void * /* opaque */) | ||
| { | ||
| LoggerPtr log = getLogger("AWSMSKIAMAuth"); |
There was a problem hiding this comment.
And please, remove this log.
| params.emplace_back("region", effective_region); | ||
| params.emplace_back("use_environment_credentials", use_environment_credentials ? "1" : "0"); |
There was a problem hiding this comment.
These are also not needed.
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Add support for AWS MSK IAM authentication for Kafka.
Documentation entry for user-facing changes
Description
This PR implements AWS MSK IAM authentication support for the Kafka Table Engine.
It allows ClickHouse to authenticate with Amazon Managed Streaming for Apache Kafka (MSK) using IAM roles.
Motivation:
Many users running ClickHouse on AWS want to connect to MSK using the standard IAM authentication method for better security and management, rather than managing SASL/SCRAM credentials or mTLS certificates.
Changes:
AWSMSKIAMAuthclass (src/Storages/Kafka/AWSMSKIAMAuth.cpp/h) to handle IAM authentication logic.StorageKafkaandKafkaConsumerto use the new authentication mechanism.librdkafka-cmaketo support the necessary build options.