feat(api-server): Add support for steaming query result files from S3.#1728
Conversation
WalkthroughThis pull request adds S3 streaming support to the API server for query results. The changes include adding the aws-sdk-s3 dependency, extending the SearchResultStream enum to support an S3 variant, implementing S3 result fetching with AWS authentication, making S3 configuration structures deserializable, updating AWS client creation signatures to accept optional endpoints, and adjusting call sites accordingly. Changes
Sequence DiagramsequenceDiagram
participant Client
participant Router as SearchResultStream
participant S3Fetcher as fetch_results_from_s3
participant AwsClient as S3 Client
participant S3 as S3 Storage
Client->>Router: fetch_results(search_job_id)
alt Storage = S3
Router->>S3Fetcher: invoke fetch_results_from_s3
S3Fetcher->>AwsClient: create_new_client(AwsAuthentication)
AwsClient->>S3: list_objects_v2(prefix)
S3-->>S3Fetcher: objects list
loop per object
S3Fetcher->>S3: get_object(key)
S3-->>S3Fetcher: ByteStream
S3Fetcher->>S3Fetcher: deserialize -> String
S3Fetcher-->>Router: yield Result<String>
end
Router-->>Client: return SearchResultStream::S3
else Storage = File or Mongo
Router-->>Client: return File/Mongo stream variant
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Possibly related PRs
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
components/clp-rust-utils/src/s3/client.rs (1)
8-24: Update docstring to document theoptional_endpointparameter.The function docstring doesn't document the
optional_endpointparameter. Consider adding documentation for this parameter to clarify when a custom endpoint should be provided (e.g., for S3-compatible services like MinIO or LocalStack)./// Creates a new S3 client. /// +/// # Arguments +/// +/// * `optional_endpoint` - Optional custom endpoint URL for S3-compatible services. +/// * `region_id` - AWS region identifier. +/// * `access_key_id` - AWS access key ID. +/// * `secret_access_key` - AWS secret access key. +/// /// # Notes /// /// * The client is configured using the latest AWS SDK behavior version. /// * The client enforces path-style addressing. /// /// # Returns /// /// A newly created S3 client.components/clp-rust-utils/src/clp_config/s3_config.rs (1)
20-25: Consider usingSecretStringforsecret_access_keyto prevent accidental exposure.Storing
secret_access_keyas a plainStringrisks accidental logging or debug output exposing the secret. Thesecrecycrate (already used inclient.rs) providesSecretStringwhich implementsDebugto redact contents and requires explicit.expose_secret()to access the value.+use secrecy::SecretString; + /// Represents AWS credentials. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] pub struct AwsCredentials { pub access_key_id: String, - pub secret_access_key: String, + pub secret_access_key: SecretString, }Note: You'll need to derive
Debugmanually to redact the secret, and removePartialEq, EqasSecretStringdoesn't implement them (or implement them manually comparing onlyaccess_key_id).components/api-server/src/client.rs (1)
431-448: Update doc comment to include S3Stream.The type parameter documentation lists
FileStreamandMongoStreambut doesn't mention the newS3Streamparameter./// # Type Parameters /// /// * `FileStream`: Streaming from file system storage. /// * `MongoStream`: Streaming from MongoDB storage. + /// * `S3Stream`: Streaming from S3 storage.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
components/api-server/Cargo.toml(1 hunks)components/api-server/src/client.rs(7 hunks)components/api-server/src/error.rs(3 hunks)components/clp-rust-utils/src/clp_config/package/config.rs(2 hunks)components/clp-rust-utils/src/clp_config/s3_config.rs(2 hunks)components/clp-rust-utils/src/s3/client.rs(2 hunks)components/job-orchestration/job_orchestration/executor/query/fs_search_task.py(3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).
Applied to files:
components/api-server/Cargo.toml
📚 Learning: 2025-11-03T16:17:40.223Z
Learnt from: hoophalab
Repo: y-scope/clp PR: 1535
File: components/clp-rust-utils/src/clp_config/package/config.rs:47-61
Timestamp: 2025-11-03T16:17:40.223Z
Learning: In the y-scope/clp repository, the `ApiServer` struct in `components/clp-rust-utils/src/clp_config/package/config.rs` is a Rust-native configuration type and does not mirror any Python code, unlike other structs in the same file (Config, Database, ResultsCache, Package) which are mirrors of Python definitions.
Applied to files:
components/clp-rust-utils/src/clp_config/package/config.rs
📚 Learning: 2024-11-15T20:07:22.256Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 569
File: components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py:380-392
Timestamp: 2024-11-15T20:07:22.256Z
Learning: The current implementation assumes single-threaded execution, so race conditions in functions like `is_target_extracted` in `components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py` are not currently a concern.
Applied to files:
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
🧬 Code graph analysis (1)
components/clp-rust-utils/src/clp_config/package/config.rs (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)
S3Config(511-515)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: conventional-commits
- GitHub Check: package-image
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: build (ubuntu-24.04)
- GitHub Check: antlr-code-committed (macos-15)
- GitHub Check: build (macos-15)
- GitHub Check: lint-check (macos-15)
- GitHub Check: rust-checks
🔇 Additional comments (12)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (1)
15-19: LGTM on imports.The new imports for S3 utilities and
QueryTaskStatusare appropriate for the S3 upload functionality being added.Also applies to: 29-29
components/api-server/src/error.rs (1)
1-5: LGTM on AWS SDK imports.The imports for
SdkError,GetObjectError,ListObjectsV2Error, andByteStreamErrorare correctly structured for the AWS SDK S3 error handling.components/clp-rust-utils/src/clp_config/package/config.rs (2)
3-4: LGTM on S3Config import.The import enables the S3 configuration to be included in the
StreamOutputStorage::S3variant.
166-177: Verify RustS3Configmirrors Python definition.The
StreamOutputStorage::S3variant includess3_config. Confirm that the RustS3Configstruct has matching fields forregion_code,bucket,key_prefix, andaws_authenticationas defined in the Python counterpart, and that deserialization will work correctly.components/clp-rust-utils/src/s3/client.rs (1)
34-40: LGTM on optional endpoint handling.The implementation correctly uses a mutable builder pattern and conditionally sets the endpoint URL. The
set_endpoint_urlmethod properly acceptsOption<String>, making the conversion with.map(|s| s.to_owned())appropriate.components/api-server/Cargo.toml (1)
17-17: No action needed—aws-sdk-s3version 1.116.0 is valid and current.Version 1.116.0 was released on December 2, 2025, making it a recent and appropriate dependency specification.
components/clp-rust-utils/src/clp_config/s3_config.rs (1)
1-18: LGTM on the Deserialize additions for configuration loading.The addition of
DeserializetoS3ConfigandAwsAuthenticationenables proper configuration deserialization. The#[serde(tag = "type")]attribute correctly handles the tagged enum representation.components/api-server/src/client.rs (5)
4-17: LGTM on import additions.The imports correctly bring in the necessary types for S3 configuration and credential handling.
195-216: LGTM on the storage backend branching logic.The branching correctly handles the different storage backends (Fs, S3, Mongo) and appropriately awaits the async
fetch_results_from_s3method.
274-280: LGTM on theunreachable!()guard.The
unreachable!()is appropriate here sincefetch_resultsonly calls this method when the storage isStreamOutputStorage::Fs. The doc comment correctly documents this invariant.
333-344: Irrefutable pattern on single-variant enum.
AwsAuthenticationcurrently has only one variant (Credentials). Theletpattern is correct but will need updating if other authentication methods (e.g., IAM roles, environment credentials) are added.This is fine for now, but verify this is intentional. If other authentication methods are planned, consider adding a
TODOcomment or using a match expression for future extensibility.
451-473: LGTM on Stream implementation.The
Streamimplementation correctly handles the newS3variant following the same pattern as the existing variants.
| ) | ||
| .await; | ||
|
|
||
| let key_prefix = format!("{}{}/", s3_config.key_prefix, search_job_id); |
There was a problem hiding this comment.
Just to confirm: The key_prefix here contains an ending /, right?
There was a problem hiding this comment.
yes, it should contain /
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/api-server/src/client.rs (1)
147-173: Updatefetch_resultsdocs to mention S3-backed streamsThe doc comment only lists
SearchResultStream::FileandSearchResultStream::Mongo, but the return type now includes an S3 stream variant and the implementation can return it.Consider updating the bullet list accordingly, e.g.:
- /// * [`SearchResultStream::File`] if the job's results are stored in files. - /// * [`SearchResultStream::Mongo`] if the job's results are stored in `MongoDB`. + /// * [`SearchResultStream::File`] if the job's results are stored in files. + /// * [`SearchResultStream::Mongo`] if the job's results are stored in `MongoDB`. + /// * [`SearchResultStream::S3`] if the job's results are stored in `S3`.
♻️ Duplicate comments (1)
components/api-server/src/client.rs (1)
321-361: Tighten S3 logging levels and makekey_prefixhandling more robustTwo small follow-ups here:
- Log levels – the per-call and per-object info logs can be quite chatty in production, and prior review already suggested lowering some of these:
- tracing::info!("Streaming results from S3"); + tracing::debug!("Streaming results from S3"); @@ - tracing::info!("Streaming results from S3 prefix: {}", key_prefix); + tracing::debug!("Streaming results from S3 prefix: {}", key_prefix); @@ - tracing::info!("S3 object {:?} doesn't have a key", object); + tracing::warn!("S3 object {:?} doesn't have a key", object); @@ - tracing::info!("Streaming results from S3 object with key: {}", key); + tracing::debug!("Streaming results from S3 object with key: {}", key);
key_prefixtrailing slash –format!("{}{}/", s3_config.key_prefix, search_job_id)assumeskey_prefixalready ends with/. To make this more forgiving of configuration, you could normalise it:- let key_prefix = format!("{}{}/", s3_config.key_prefix, search_job_id); + let base_prefix = if s3_config.key_prefix.ends_with('/') { + format!("{}{}", s3_config.key_prefix, search_job_id) + } else { + format!("{}/{}", s3_config.key_prefix, search_job_id) + }; + let key_prefix = format!("{}/", base_prefix);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (2)
components/api-server/src/client.rs(7 hunks)components/clp-rust-utils/src/clp_config/package/config.rs(2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-11-03T16:17:40.223Z
Learnt from: hoophalab
Repo: y-scope/clp PR: 1535
File: components/clp-rust-utils/src/clp_config/package/config.rs:47-61
Timestamp: 2025-11-03T16:17:40.223Z
Learning: In the y-scope/clp repository, the `ApiServer` struct in `components/clp-rust-utils/src/clp_config/package/config.rs` is a Rust-native configuration type and does not mirror any Python code, unlike other structs in the same file (Config, Database, ResultsCache, Package) which are mirrors of Python definitions.
Applied to files:
components/clp-rust-utils/src/clp_config/package/config.rs
🧬 Code graph analysis (2)
components/clp-rust-utils/src/clp_config/package/config.rs (1)
components/clp-py-utils/clp_py_utils/clp_config.py (2)
AwsAuthentication(547-577)S3Config(580-584)
components/api-server/src/client.rs (3)
components/clp-py-utils/clp_py_utils/clp_config.py (2)
AwsAuthentication(547-577)StorageEngine(108-110)components/clp-rust-utils/src/s3/client.rs (1)
create_new_client(18-40)components/clp-rust-utils/src/sqs/client.rs (1)
create_new_client(14-34)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: build (ubuntu-24.04)
- GitHub Check: build (macos-15)
- GitHub Check: rust-checks
🔇 Additional comments (5)
components/clp-rust-utils/src/clp_config/package/config.rs (1)
3-3: AwsAuthentication/S3Config import wiring looks correctBringing
AwsAuthenticationandS3Configinto scope matches their usage inS3Ingestionand theStreamOutputStorage::S3variant; no issues from a correctness perspective.components/api-server/src/client.rs (4)
4-14: Config/import wiring for AWS and storage backends looks consistentThe updated
clp_configimports (includingAwsAuthentication,Config,StorageEngine,StreamOutputStorage, andCredentials) line up with how they are used later in the client; nothing stands out from a correctness or ergonomics standpoint.
194-209: Storage-backend branching infetch_resultsis clear and avoids duplicated returnsThe
write_to_filecheck followed by amatchonself.config.stream_output.storagecleanly separates FS and S3 paths, and defers the Mongo path to the existing helper. This keeps the control flow straightforward and avoids duplicatedOk(...)returns.
260-273:unreachable!()infetch_results_from_filecorrectly encodes the storage invariantGiven the call site only invokes
fetch_results_from_filewhenStreamOutputStorage::Fsis selected, usinglet StreamOutputStorage::Fs { directory } = ... else { unreachable!(); }is a reasonable way to encode the configuration invariant, and the panic note in the doc comment remains accurate.
427-445:SearchResultStreamenum andStreamimpl are correctly generalised to S3Extending
SearchResultStreamwith a thirdS3Streamtype parameter and variant, and updating theStreamimpl to delegatepoll_nextfor the S3 case, keeps the abstraction uniform across backends. Type bounds and pin projections look consistent.Also applies to: 448-465
| let AwsAuthentication::Credentials { credentials } = &s3_config.aws_authentication; | ||
|
|
||
| let s3_config = s3_config.clone(); | ||
| let credentials = credentials.clone(); | ||
|
|
There was a problem hiding this comment.
Fix refutable AwsAuthentication pattern (compile error) and clarify unsupported auth types
AwsAuthentication is an enum, so using a bare
let AwsAuthentication::Credentials { credentials } = &s3_config.aws_authentication;is a refutable pattern and will not compile without an else clause. It also assumes that only the Credentials variant is ever configured for stream_output.s3_config.aws_authentication and will panic if another auth type is used.
At minimum, this should be converted to a let-else that keeps the current invariant explicit:
- let AwsAuthentication::Credentials { credentials } = &s3_config.aws_authentication;
+ let AwsAuthentication::Credentials { credentials } =
+ &s3_config.aws_authentication
+ else {
+ unreachable!(
+ "Only `credentials` AWS authentication is currently supported for \
+ `stream_output.s3_config.aws_authentication`"
+ );
+ };If you expect other auth modes (e.g., profile, EC2, env vars) to be used here in future, it would be safer to turn this into a small match that yields a ClientError instead of panicking, but the above fixes the immediate compile-time issue.
| #[serde(rename = "s3")] | ||
| S3 { staging_directory: String }, | ||
| S3 { | ||
| staging_directory: String, | ||
| s3_config: S3Config, | ||
| }, | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
S3 stream_output variant matches S3Config; consider adding a deserialisation test
The S3 { staging_directory, s3_config: S3Config } shape aligns with the Python-side S3Config model and keeps the Rust mirror consistent. To guard against future drift, it would be helpful to add a small serde test that deserialises a stream_output S3 block from JSON/YAML and asserts the staging_directory and nested s3_config fields.
y-scope#1728) Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
y-scope#1728) Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
y-scope#1728) Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
Description
This is a successor to #1722, which adds support for uploading query result files to S3.
In this PR, we add support for streaming query results in api server from s3
Checklist
breaking change.
Validation performed
stream_outputstorage tos3"write_to_file": true(tested on localstack)
Summary by CodeRabbit
New Features
Chores
✏️ Tip: You can customize this high-level summary in your review settings.