Skip to content

feature: Add AWS MSK IAM Authentication Support for Kafka Engine#91118

Closed
kalavt wants to merge 2 commits intoClickHouse:masterfrom
kalavt:feature/aws_msk_iam_auth
Closed

feature: Add AWS MSK IAM Authentication Support for Kafka Engine#91118
kalavt wants to merge 2 commits intoClickHouse:masterfrom
kalavt:feature/aws_msk_iam_auth

Conversation

@kalavt
Copy link
Copy Markdown

@kalavt kalavt commented Nov 28, 2025

Changelog category (leave one):

  • New Feature

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

Add support for AWS MSK IAM authentication for Kafka.

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

Description

This PR implements AWS MSK IAM authentication support for the Kafka Table Engine.
It allows ClickHouse to authenticate with Amazon Managed Streaming for Apache Kafka (MSK) using IAM roles.

Motivation:
Many users running ClickHouse on AWS want to connect to MSK using the standard IAM authentication method for better security and management, rather than managing SASL/SCRAM credentials or mTLS certificates.

Changes:

  • Added AWSMSKIAMAuth class (src/Storages/Kafka/AWSMSKIAMAuth.cpp/h) to handle IAM authentication logic.
  • Updated StorageKafka and KafkaConsumer to use the new authentication mechanism.
  • Added configuration settings for AWS MSK IAM auth.
  • Updated librdkafka-cmake to support the necessary build options.
  • Updated documentation with usage examples.

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Nov 28, 2025

CLA assistant check
All committers have signed the CLA.

@antaljanosbenjamin antaljanosbenjamin self-assigned this Nov 28, 2025
@kalavt kalavt marked this pull request as draft November 28, 2025 14:33
@kalavt
Copy link
Copy Markdown
Author

kalavt commented Nov 28, 2025

For implement #80446

@kalavt kalavt marked this pull request as ready for review December 1, 2025 08:25
@kalavt
Copy link
Copy Markdown
Author

kalavt commented Dec 1, 2025

@antaljanosbenjamin can you pls have a check on this PR, thanks.

Copy link
Copy Markdown
Member

@antaljanosbenjamin antaljanosbenjamin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general looks good. There are a few general comments:

  1. Could you please also update the regarding docs?
  2. Have you thought about using "assumed roles" similarly to the S3 table function? I think it is implemented for S3, it shouldn't be hard to enable it here too.
  3. I have to talk with some security people, but we might need a setting to disable this globally if there is some issue regarding security. Please don't act on this yet, I will try to get a precise answer on that and understand it better.

Comment thread src/Storages/Kafka/KafkaSettings.cpp Outdated
Comment thread src/Storages/Kafka/KafkaConfigLoader.cpp Outdated
@antaljanosbenjamin antaljanosbenjamin added the can be tested Allows running workflows for external contributors label Dec 1, 2025
@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh Bot commented Dec 1, 2025

Workflow [PR], commit [3a8f50a]

Summary:

job_name test_name status info comment
Style check failure
cpp failure cidb
Build (arm_tidy) failure
Build ClickHouse failure cidb
Docs check dropped
Build (amd_debug) dropped
Build (amd_asan) dropped
Build (amd_tsan) dropped
Build (amd_msan) dropped
Build (amd_ubsan) dropped
Build (amd_binary) dropped
Build (arm_asan) dropped

@clickhouse-gh clickhouse-gh Bot added pr-feature Pull request with new product feature submodule changed At least one submodule changed in this PR. labels Dec 1, 2025
Comment thread src/Storages/Kafka/AWSMSKIAMAuth.cpp
@antaljanosbenjamin
Copy link
Copy Markdown
Member

The changes in c-ares are intentional?

@kalavt
Copy link
Copy Markdown
Author

kalavt commented Dec 8, 2025

@antaljanosbenjamin for your comments:

The changes in c-ares are intentional?

Regarding the c-ares submodule changes visible in the PR:

I can confirm that I have not made any direct modifications to the contrib/c-ares submodule. The changes appearing in the PR diff are due to the submodule version difference between my feature branch and the current master branch.

Here's what happened:

  • My feature branch was created from an earlier master commit
  • After branch creation, master updated the c-ares submodule to a newer version
  • The PR comparison now shows this submodule pointer difference as part of the overall diff

Could you please also update the regarding docs?

updated, or any other docs need update, pls let me know.

Have you thought about using "assumed roles" similarly to the S3 table function? I think it is implemented for S3, it shouldn't be hard to enable it here too.

Yes, but should be by another PR.

I have to talk with some security people, but we might need a setting to disable this globally if there is some issue regarding security. Please don't act on this yet, I will try to get a precise answer on that and understand it better.

sure. Thanks for checking,

@antaljanosbenjamin
Copy link
Copy Markdown
Member

So for the submodules, please revert those changes locally. The issue is in my opinion is the submodules were updated on master, you didn't updated them locally but committed them. This means if we would merge your PR, it would revert the changes in submodules.

For the credentials, there are two things. First of all, please use our S3CredentialsProviderChain that can be found here:

class S3CredentialsProviderChain : public Aws::Auth::AWSCredentialsProviderChain
{
public:
S3CredentialsProviderChain(
const DB::S3::PocoHTTPClientConfiguration & configuration,
const Aws::Auth::AWSCredentials & credentials,
CredentialsConfiguration credentials_configuration);
};

Second, when creating such provider chain, please make sure use_environment_credentials can be enabled only from server configuration and not by user queries. I think for now it is okay if it is just a boolean flag without any further details, so it is either enabled or disabled to all URL.

If you have any questions, please don't hesitate to ask.

@kalavt kalavt force-pushed the feature/aws_msk_iam_auth branch from ab166da to abacc8d Compare December 10, 2025 17:06
@clickhouse-gh clickhouse-gh Bot added the manual approve Manual approve required to run CI label Dec 10, 2025
@kalavt kalavt force-pushed the feature/aws_msk_iam_auth branch 2 times, most recently from a732cbb to b6d319e Compare December 10, 2025 17:53
@kalavt
Copy link
Copy Markdown
Author

kalavt commented Dec 10, 2025

@antaljanosbenjamin for your comments, I've solved the submodules changes, using S3CredentialsProviderChain with use_environment_credentials for loading credentials.

thanks for your guidance and point out the security concern.

@kalavt kalavt force-pushed the feature/aws_msk_iam_auth branch from 0a265e0 to b1bd1c1 Compare December 12, 2025 14:50
Copy link
Copy Markdown
Member

@antaljanosbenjamin antaljanosbenjamin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few small things.

Comment thread src/Storages/Kafka/AWSMSKIAMAuth.h Outdated
Comment thread src/Storages/Kafka/KafkaConfigLoader.cpp Outdated
Comment thread src/Storages/Kafka/StorageKafka.h Outdated
Comment thread src/Storages/Kafka/AWSMSKIAMAuth.cpp Outdated
Comment thread src/Storages/Kafka/AWSMSKIAMAuth.cpp Outdated
@kalavt kalavt changed the title feature: Add AWS MSK IAM Authentication Support for Kafka Engine WIP: feature: Add AWS MSK IAM Authentication Support for Kafka Engine Dec 17, 2025
@kalavt kalavt force-pushed the feature/aws_msk_iam_auth branch 7 times, most recently from 2eb1077 to e5ee4ac Compare December 18, 2025 18:30
@kalavt kalavt force-pushed the feature/aws_msk_iam_auth branch 2 times, most recently from cf93f1c to a193144 Compare December 20, 2025 08:42
@kalavt kalavt changed the title WIP: feature: Add AWS MSK IAM Authentication Support for Kafka Engine feature: Add AWS MSK IAM Authentication Support for Kafka Engine Dec 20, 2025
@kalavt
Copy link
Copy Markdown
Author

kalavt commented Dec 20, 2025

Hi @antaljanosbenjamin

I've completely re-design the feature and tested it works, would you please review my code and let me know if you have any concern. Thanks.

@kalavt kalavt force-pushed the feature/aws_msk_iam_auth branch 4 times, most recently from d9fdaf8 to 4ce1766 Compare December 24, 2025 16:57
@kalavt
Copy link
Copy Markdown
Author

kalavt commented Dec 25, 2025

@antaljanosbenjamin can you pls guide me for next step merge this PR?

@antaljanosbenjamin
Copy link
Copy Markdown
Member

I will have a look at it next week on the first day I am working. Sorry for the delay, but I am on vacation.

# it does NOT require Cyrus SASL library. By enabling it unconditionally
# when WITH_SSL is set, we can support AWS MSK IAM authentication and
# other OAUTHBEARER mechanisms without needing the full Cyrus SASL stack.
set(WITH_SASL_OAUTHBEARER 1)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to verify this locally, so this is a note to myself.

Comment thread src/Storages/Kafka/AWSMSKIAMAuth.cpp Outdated
Comment thread src/Storages/Kafka/KafkaConsumer2.cpp Outdated
Comment on lines +143 to +157
{
std::lock_guard lock(g_provider_cache.mutex);
if (!g_provider_cache.provider)
{
auto aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
region, {}, 0, {}, false, false, false, false, {}, {});

S3::CredentialsConfiguration credentials_configuration;
credentials_configuration.use_environment_credentials = use_environment_credentials;

g_provider_cache.provider = std::make_shared<S3::S3CredentialsProviderChain>(
aws_client_configuration, Aws::Auth::AWSCredentials{}, credentials_configuration);
}
provider = g_provider_cache.provider;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the region is different in the current config then it was when the cache was created?

I might miss something, but I don't understand why the storage cannot own the credentials provider. it is tight to a single storage, clear ownership and scope. Now we have a global object which is shared among multiple storages.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi. @antaljanosbenjamin

sorry for the late replay, thought you still on vacation :)

You are absolutely right. The global cache was problematic because it didn't account for different regions across multiple storages and introduced unnecessary global state.

I have refactored the implementation to ensure proper scoping and ownership:

  1. Removed Global Cache: g_provider_cache has been removed entirely.
  2. Scoped Ownership: The S3CredentialsProviderChain is now stored inside OAuthBearerTokenRefreshContext.
  3. Storage Ownership: StorageKafka (and StorageKafka2) now owns the OAuthBearerTokenRefreshContext (via a std::shared_ptr member). This ensures the credentials provider's lifecycle is tied directly to the storage instance and uses the correct configuration (region) for that specific table.
  4. Safe Context Passing: I'm passing the context pointer via the configuration string to avoid conflicts with cppkafka's use of the opaque pointer, ensuring we can access this scoped context safely in the callback.

This design eliminates the risk of region mismatch and removes the global state as suggested.

@kalavt kalavt force-pushed the feature/aws_msk_iam_auth branch 2 times, most recently from e64d84a to b7f3eee Compare January 29, 2026 06:21
- Implements AWS MSK IAM authentication for Kafka storage.
- Updates librdkafka-cmake configuration.
@kalavt kalavt force-pushed the feature/aws_msk_iam_auth branch from b7f3eee to bad8cb1 Compare January 29, 2026 19:07
Copy link
Copy Markdown
Member

@antaljanosbenjamin antaljanosbenjamin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments, please don't use force push.

using Base = WriteBufferFromVectorImpl<std::string>;
public:
WriteBufferFromOwnString()
WriteBufferFromOwnString() // NOLINT(modernize-use-equals-default, hicpp-use-equals-default)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor cannot use = default because it must initialize the base class WriteBufferFromVectorImpl<std::string> with the value member inherited from StringHolder. A defaulted constructor cannot include member initializers, so the NOLINT suppression is required here.

Comment on lines +391 to +398
String aws_region = kafka_settings[KafkaSetting::kafka_aws_region].value;

String broker_list = kafka_config.has_property("metadata.broker.list")
? kafka_config.get("metadata.broker.list")
: "";

AWSMSKIAMAuth::setupAuthentication(
kafka_config, params.config, aws_region, broker_list, params.log, storage.getOAuthContext());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong, but this should only happen on the first occasion, no?

So I would say a slightly better logic considering my other comment about not returning a non-const reference to internal members:

  1. check if the storage has a context or not (returning a non const raw pointer is perfect for this use-case)
  2. if not, then create one and use the setOAuthContext function to assign it to the storage
  3. if there is already a context, then use that.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct. The OAuth context initialization is expensive and should be cached. I'll refactor this part.

Comment on lines 83 to +90

// Enable background SASL callbacks for OAUTHBEARER authentication.
if (auto * error = rd_kafka_sasl_background_callbacks_enable(consumer->get_handle()))
{
// If SASL background callbacks cannot be enabled (e.g., not configured or not supported),
// we still continue since this is not a critical error for basic Kafka functionality
rd_kafka_error_destroy(error);
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this callback. My reasoning is if the consumer is not polled, then we don't need to refresh the auth secrets.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The background SASL callbacks are necessary for handling low-traffic topics where the consumer may be idle for extended periods. Without them, OAuth tokens can expire during idle periods, causing unrecoverable authentication failures when the consumer attempts to poll again. This was specifically implemented to ensure consumers remain operational even on low-volume topics where polls are infrequent.

Comment on lines +108 to +110
// NOTE: We cannot use the global 'opaque' pointer because it is used by cppkafka
// to store the Consumer/Producer instance. Overwriting it would break cppkafka callbacks.
// Instead, we pass the context pointer address in the configuration string.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this. This feels a very hacky solution. I think the proper solution is to add this functionality to cppkafka first and then we can utilize it here. Passing opaque pointers is fine, passing pointers as strings is not in my opnion. Technically they are the same, but it seems like a code smell. Feel free open a PR to https://github.com/ClickHouse/cppkafka and tag me.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yah that's a workaround temporary not the ideal solution, for now, should we keep the current implementation as a temporary workaround until the cppkafka enhancement is merged,
or would you prefer to block this PR until the proper solution is in place?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, I've proposed a PR to https://github.com/ClickHouse/cppkafka to support OAUTHBEARER token refresh callback.
ClickHouse/cppkafka#5
and I'll refactor the Clickhouse AWS MSK IAM auth implementation to new PR with newly added refresh callback function #96100

Comment thread src/Storages/Kafka/AWSMSKIAMAuth.cpp Outdated
const char * oauthbearer_config,
void * /* opaque */)
{
LoggerPtr log = getLogger("AWSMSKIAMAuth");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And please, remove this log.

Comment thread src/Storages/Kafka/AWSMSKIAMAuth.cpp Outdated
Comment on lines +259 to +260
params.emplace_back("region", effective_region);
params.emplace_back("use_environment_credentials", use_environment_credentials ? "1" : "0");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are also not needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors manual approve Manual approve required to run CI pr-feature Pull request with new product feature submodule changed At least one submodule changed in this PR.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants