Skip to content

Commit a44cf47

Browse files
committed
Fix IAM auth and add test
1 parent 311dc2a commit a44cf47

5 files changed

Lines changed: 167 additions & 17 deletions

File tree

src/Databases/DataLake/GlueCatalog.cpp

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ GlueCatalog::GlueCatalog(
9292
: ICatalog("")
9393
, DB::WithContext(context_)
9494
, log(getLogger("GlueCatalog(" + settings_.region + ")"))
95-
, credentials(settings_.aws_access_key_id, settings_.aws_secret_access_key)
9695
, region(settings_.region)
9796
, settings(settings_)
9897
, table_engine_definition(table_engine_definition_)
@@ -132,38 +131,41 @@ GlueCatalog::GlueCatalog(
132131
/* opt_disk_name = */ {},
133132
/* request_throttler = */ {});
134133

134+
135135
Aws::Glue::GlueClientConfiguration client_configuration;
136136
client_configuration.maxConnections = static_cast<unsigned>(global_settings[DB::Setting::s3_max_connections]);
137137
client_configuration.connectTimeoutMs = static_cast<unsigned>(global_settings[DB::Setting::s3_connect_timeout_ms]);
138138
client_configuration.requestTimeoutMs = static_cast<unsigned>(global_settings[DB::Setting::s3_request_timeout_ms]);
139139
client_configuration.region = region;
140140
auto endpoint_provider = std::make_shared<Aws::Glue::GlueEndpointProvider>();
141141

142+
Aws::Auth::AWSCredentials credentials(settings_.aws_access_key_id, settings_.aws_secret_access_key);
142143
/// Only for testing when we are mocking glue
143144
if (!endpoint.empty())
144145
{
145146
client_configuration.endpointOverride = endpoint;
146147
endpoint_provider->OverrideEndpoint(endpoint);
147-
Aws::Auth::AWSCredentials fake_credentials_for_fake_catalog;
148+
148149
if (credentials.IsEmpty())
149150
{
150151
/// You can specify any key for fake moto glue, it's just important
151152
/// for it not to be empty.
152-
fake_credentials_for_fake_catalog.SetAWSAccessKeyId("testing");
153-
fake_credentials_for_fake_catalog.SetAWSSecretKey("testing");
153+
credentials.SetAWSAccessKeyId("testing");
154+
credentials.SetAWSSecretKey("testing");
154155
}
155-
else
156-
fake_credentials_for_fake_catalog = credentials;
157156

158-
glue_client = std::make_unique<Aws::Glue::GlueClient>(fake_credentials_for_fake_catalog, endpoint_provider, client_configuration);
157+
Poco::URI uri(endpoint);
158+
if (uri.getScheme() == "http")
159+
poco_config.scheme = Aws::Http::Scheme::HTTP;
159160
}
160161
else
161162
{
162163
LOG_TRACE(log, "Creating AWS glue client with credentials empty {}, region '{}', endpoint '{}'", credentials.IsEmpty(), region, endpoint);
163-
auto credentials_provider = DB::S3::getCredentialsProvider(poco_config, credentials, creds_config);
164-
glue_client = std::make_unique<Aws::Glue::GlueClient>(credentials_provider, endpoint_provider, client_configuration);
165164
}
166165

166+
credentials_provider = DB::S3::getCredentialsProvider(poco_config, credentials, creds_config);
167+
glue_client = std::make_unique<Aws::Glue::GlueClient>(credentials_provider, endpoint_provider, client_configuration);
168+
167169
}
168170

169171
GlueCatalog::~GlueCatalog() = default;
@@ -287,7 +289,6 @@ bool GlueCatalog::tryGetTableMetadata(
287289
request.SetDatabaseName(database_name);
288290
request.SetName(table_name);
289291

290-
291292
auto outcome = glue_client->GetTable(request);
292293
if (outcome.IsSuccess())
293294
{
@@ -414,8 +415,9 @@ void GlueCatalog::setCredentials(TableMetadata & metadata) const
414415

415416
if (storage_type == StorageType::S3)
416417
{
417-
auto creds = std::make_shared<S3Credentials>(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken());
418-
metadata.setStorageCredentials(creds);
418+
auto credentials = credentials_provider->GetAWSCredentials();
419+
auto s3_creds = std::make_shared<S3Credentials>(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken());
420+
metadata.setStorageCredentials(s3_creds);
419421
}
420422
else
421423
{
@@ -494,9 +496,14 @@ GlueCatalog::ObjectStorageWithPath GlueCatalog::createObjectStorageForEarlyTable
494496
if (args.size() == 1)
495497
{
496498
if (table_metadata.hasStorageCredentials())
499+
{
497500
table_metadata.getStorageCredentials()->addCredentialsToEngineArgs(args);
498-
else if (!credentials.IsExpiredOrEmpty())
501+
}
502+
else
503+
{
504+
auto credentials = credentials_provider->GetAWSCredentials();
499505
DataLake::S3Credentials(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()).addCredentialsToEngineArgs(args);
506+
}
500507
}
501508

502509
auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();

src/Databases/DataLake/GlueCatalog.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ namespace Aws::Glue
1818
class GlueClient;
1919
}
2020

21+
namespace Aws::Auth
22+
{
23+
class AWSCredentialsProvider;
24+
}
25+
2126
namespace DataLake
2227
{
2328

@@ -69,7 +74,7 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
6974

7075
std::unique_ptr<Aws::Glue::GlueClient> glue_client;
7176
const LoggerPtr log;
72-
Aws::Auth::AWSCredentials credentials;
77+
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
7378
std::string region;
7479
CatalogSettings settings;
7580
DB::ASTPtr table_engine_definition;

tests/integration/compose/docker_compose_glue_catalog.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,5 @@ services:
4747
until (/usr/bin/mc config host add minio http://minio:9000 minio ClickHouse_Minio_P@ssw0rd) do echo '...waiting...' && sleep 1; done;
4848
/usr/bin/mc rm -r --force minio/warehouse-glue;
4949
/usr/bin/mc mb minio/warehouse-glue --ignore-existing;
50-
/usr/bin/mc policy set public minio/warehouse-glue;
5150
tail -f /dev/null
5251
"
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import sys
2+
from datetime import datetime, timedelta, timezone
3+
4+
from bottle import request, response, route, run
5+
6+
if len(sys.argv) >= 3:
7+
expected_role = sys.argv[2]
8+
else:
9+
expected_role = 'miniorole'
10+
11+
@route("/")
12+
def ping():
13+
response.content_type = "text/plain"
14+
response.set_header("Content-Length", 2)
15+
return "OK"
16+
17+
18+
@route("/", method="POST")
19+
def sts():
20+
access_key = "minio"
21+
secret_access_key = "wrong_key"
22+
23+
if f"RoleSessionName={expected_role}" in str(request.url):
24+
secret_access_key = "ClickHouse_Minio_P@ssw0rd"
25+
26+
expiration = datetime.now(timezone.utc) + timedelta(hours=1)
27+
expiration_str = expiration.strftime("%Y-%m-%dT%H:%M:%SZ")
28+
29+
response.content_type = "text/xml"
30+
return f"""
31+
<AssumeRoleResponse xmlns="https://sts.amazonaws.com/doc/2011-06-15/">
32+
<AssumeRoleResult>
33+
<Credentials>
34+
<AccessKeyId>{access_key}</AccessKeyId>
35+
<SecretAccessKey>{secret_access_key}</SecretAccessKey>
36+
<Expiration>{expiration_str}</Expiration>
37+
</Credentials>
38+
</AssumeRoleResult>
39+
</AssumeRoleResponse>
40+
"""
41+
42+
run(host="0.0.0.0", port=int(sys.argv[1]))

tests/integration/test_database_glue/test.py

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,20 @@
2929
)
3030

3131
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm
32+
from helpers.mock_servers import start_mock_servers
3233

3334
import boto3
3435

36+
37+
def run_s3_mocks(started_cluster, args=[]):
38+
script_dir = os.path.join(os.path.dirname(__file__), "s3_mocks")
39+
start_mock_servers(
40+
started_cluster,
41+
script_dir,
42+
[("mock_sts.py", "sts.us-east-1.amazonaws.com", "80", args)],
43+
)
44+
45+
3546
CATALOG_NAME = "test"
3647

3748
BASE_URL = "http://glue:3000"
@@ -182,7 +193,7 @@ def generate_arrow_data(num_rows=5):
182193
return table
183194

184195
def create_clickhouse_glue_database(
185-
started_cluster, node, name, additional_settings={}
196+
started_cluster, node, name, additional_settings={}, with_credentials=True
186197
):
187198
settings = {
188199
"catalog_type": "glue",
@@ -193,12 +204,14 @@ def create_clickhouse_glue_database(
193204

194205
settings.update(additional_settings)
195206

207+
credential_args = f",'{minio_access_key}', '{minio_secret_key}'" if with_credentials else ""
208+
196209
node.query(
197210
f"""
198211
DROP DATABASE IF EXISTS {name};
199212
SET allow_database_glue_catalog=true;
200213
SET write_full_path_in_iceberg_metadata=true;
201-
CREATE DATABASE {name} ENGINE = DataLakeCatalog('{BASE_URL}', '{minio_access_key}', '{minio_secret_key}')
214+
CREATE DATABASE {name} ENGINE = DataLakeCatalog('{BASE_URL}'{credential_args})
202215
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
203216
"""
204217
)
@@ -251,8 +264,20 @@ def started_cluster():
251264
with_glue_catalog=True,
252265
)
253266

267+
sts = cluster.add_instance(
268+
name="sts.us-east-1.amazonaws.com",
269+
hostname="sts.us-east-1.amazonaws.com",
270+
image="clickhouse/python-bottle",
271+
tag="latest",
272+
stay_alive=True,
273+
)
274+
sts.stop_clickhouse(kill=True)
275+
254276
logging.info("Starting cluster...")
255277
cluster.start()
278+
logging.info("Cluster started")
279+
280+
run_s3_mocks(cluster)
256281

257282
yield cluster
258283

@@ -715,3 +740,75 @@ def test_table_without_metadata_location(started_cluster):
715740
assert "Iceberg" in create_table_result, f"Expected Iceberg engine in: {create_table_result}"
716741

717742
node.query(f"DROP DATABASE IF EXISTS {db_name} SYNC")
743+
744+
745+
def test_sts_smoke(started_cluster):
746+
"""Test that STS authentication works with Glue catalog using role_arn and role_session_name"""
747+
node = started_cluster.instances["node1"]
748+
749+
test_ref = f"test_sts_smoke_{uuid.uuid4()}"
750+
table_name = f"{test_ref}_table"
751+
root_namespace = f"{test_ref}_namespace"
752+
753+
catalog = load_catalog_impl(started_cluster)
754+
catalog.create_namespace(root_namespace)
755+
756+
schema = Schema(
757+
NestedField(field_id=1, name="id", field_type=StringType(), required=False),
758+
NestedField(field_id=2, name="value", field_type=DoubleType(), required=False),
759+
)
760+
table = create_table(catalog, root_namespace, table_name, schema, PartitionSpec(), DEFAULT_SORT_ORDER, dir=table_name)
761+
762+
data = [
763+
{"id": "row1", "value": 10.0},
764+
{"id": "row2", "value": 20.0},
765+
{"id": "row3", "value": 30.0},
766+
]
767+
df = pa.Table.from_pylist(data)
768+
table.append(df)
769+
770+
# Test with wrong role_session_name - should fail
771+
db_name_fail = f"db_fail_{test_ref.replace('-', '_')}"
772+
create_clickhouse_glue_database(
773+
started_cluster,
774+
node,
775+
db_name_fail,
776+
additional_settings={
777+
"aws_role_arn": "arn::role",
778+
"aws_role_session_name": "wrongsession",
779+
},
780+
with_credentials=False,
781+
)
782+
783+
# Query should fail with wrong session name
784+
try:
785+
result = node.query(
786+
f"SELECT sum(value) FROM {db_name_fail}.`{root_namespace}.{table_name}` "
787+
f"SETTINGS s3_max_single_read_retries = 1, s3_retry_attempts = 1, s3_request_timeout_ms = 1000"
788+
)
789+
assert False, f"Expected query to fail with wrong session name but got result: {result}"
790+
except Exception as e:
791+
error_str = str(e)
792+
assert "403" in error_str or "Failed to get object info" in error_str or "HTTP response code: 403" in error_str, \
793+
f"Expected 403 error but got: {error_str}"
794+
795+
# Test with correct role_session_name - should succeed
796+
db_name_success = f"db_success_{test_ref.replace('-', '_')}"
797+
create_clickhouse_glue_database(
798+
started_cluster,
799+
node,
800+
db_name_success,
801+
additional_settings={
802+
"aws_role_arn": "arn::role",
803+
"aws_role_session_name": "miniorole",
804+
},
805+
with_credentials=False,
806+
)
807+
808+
# Query should succeed with correct session name
809+
result = node.query(f"SELECT sum(value) FROM {db_name_success}.`{root_namespace}.{table_name}`")
810+
assert result.strip() == "60", f"Expected sum to be 60 but got: {result}"
811+
812+
# Cleanup
813+
node.query(f"DROP DATABASE IF EXISTS {db_name_fail} SYNC")
814+
node.query(f"DROP DATABASE IF EXISTS {db_name_success} SYNC")

0 commit comments

Comments
 (0)