feat(log-ingestor): Add S3Scanner to scan a S3 bucket periodically to find and ingest newly created objects with a given prefix.#1629
Conversation
|
Warning Rate limit exceeded@LinZhihao-723 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 9 minutes and 24 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
WalkthroughThis PR changes several size-related fields from Changes
Sequence Diagram(s)sequenceDiagram
participant Test
participant S3Scanner
participant Task
participant S3ClientMgr
participant AWS_S3
participant Channel
Test->>S3Scanner: spawn(id, s3_client_manager, config, sender)
S3Scanner->>Task: construct Task
S3Scanner->>+Task: tokio::spawn(Task::run(cancel_token))
loop scan_loop
Task->>Task: select(cancel, scan())
alt cancel received
Task-->>S3Scanner: exit Ok
else proceed
Task->>S3ClientMgr: get()
S3ClientMgr-->>Task: S3Client (cloned)
Task->>AWS_S3: list_objects_v2(prefix, start_after)
AWS_S3-->>Task: contents, is_truncated
loop each object
alt key not ending '/'
Task->>Channel: send(ObjectMetadata {bucket,key,size})
end
end
alt is_truncated
Task->>Task: update start_after and continue
else
Task->>Task: sleep(scanning_interval)
end
end
end
Test->>S3Scanner: shutdown_and_join()
S3Scanner->>Task: cancel_token.cancel()
Task-->>S3Scanner: join handle result
S3Scanner-->>Test: return Ok
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (8)
components/clp-rust-utils/src/s3.rs(1 hunks)components/log-ingestor/src/aws_client_manager.rs(1 hunks)components/log-ingestor/src/compression/buffer.rs(2 hunks)components/log-ingestor/src/ingestion_job.rs(1 hunks)components/log-ingestor/src/ingestion_job/s3_scanner.rs(1 hunks)components/log-ingestor/src/ingestion_job/sqs_listener.rs(1 hunks)components/log-ingestor/tests/test_compression_listener.rs(1 hunks)components/log-ingestor/tests/test_ingestion_job.rs(4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
🧬 Code graph analysis (2)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (4)
run(52-76)spawn(187-206)shutdown_and_join(219-222)get_id(228-230)
components/log-ingestor/tests/test_ingestion_job.rs (5)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
spawn(155-174)components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
spawn(187-206)components/log-ingestor/src/aws_client_manager.rs (2)
from(47-49)from(66-68)components/clp-rust-utils/src/sqs/client.rs (1)
create_new_client(15-36)components/log-ingestor/tests/aws_config.rs (1)
from_env(47-74)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: rust-checks
🔇 Additional comments (11)
components/clp-rust-utils/src/s3.rs (1)
6-10: SwitchingObjectMetadata.sizetou64is a good fit for S3 sizesUsing
u64here aligns with S3’s non-negative, potentially >4 GiB object sizes and with the rest of the ingestion/compression pipeline. Just ensure any on-disk/over-the-wire serialisation that depended on the previoususizelayout has been audited for compatibility.components/log-ingestor/src/aws_client_manager.rs (1)
52-69: S3ClientWrapper mirrors SqsClientWrapper cleanlyThe S3 client wrapper is symmetric with
SqsClientWrapperand satisfiesAwsClientManagerType<S3Client>without adding complexity. This keeps the client-management API consistent across SQS and S3.components/log-ingestor/tests/test_compression_listener.rs (1)
9-9: AdaptingTEST_OBJECT_SIZEtou64matches the new buffer sizingMaking
TEST_OBJECT_SIZEau64and passing120 * TEST_OBJECT_SIZEintoBuffer::newkeeps the test aligned withObjectMetadata.size: u64andBuffer’su64-basedsize_thresholdwithout introducing any type-casting surprises.Also applies to: 69-69
components/log-ingestor/src/ingestion_job.rs (1)
1-5: Publicly re-exportings3_scanneralongsidesqs_listenerlooks goodAdding the
s3_scannermodule and re-exporting its items fromingestion_jobkeeps the public surface coherent—tests can depend onS3ScannerConfig/S3Scannervia the same entry point as the SQS listener.components/log-ingestor/src/compression/buffer.rs (1)
27-31: Moving buffer sizing tou64is consistent with S3 object sizesUsing
u64fortotal_sizeandsize_threshold, and updatingBuffer::newaccordingly, aligns the buffer withObjectMetadata.sizeand avoids architecture-dependentusizetruncation. This is a reasonable public API change, assuming all external call sites have been updated as you did in the tests.Also applies to: 39-45
components/log-ingestor/tests/test_ingestion_job.rs (3)
32-47: Checked conversion increate_s3_objectsis appropriate for test-only pathsUsing
usize::try_from(object.size)with anexpectmakes the test fail loudly if someone ever feeds an unreasonably largesizeinto the helper, while the current tests stick to tiny payloads (16 bytes). This is a sensible safety net for a test utility.
151-224: Reworkedtest_sqs_listenerexercises end-to-end SQS→S3 flow effectivelyUsing a per-test UUID prefix,
SqsClientWrapper, and the shared upload helpers makes this integration test much clearer and more robust to interference between runs. The channel capacity and 30‑second timeout are reasonable for LocalStack/AWS environments, and filtering via the prefix plus random “noise” keys validates that only relevant objects are surfaced before comparing sorted created vs. received metadata.
226-287:test_s3_scannergives good coverage of the new scanner behaviourThis test mirrors the SQS case for the scanner: unique per-job prefix, shared helpers for uploads/noise, and a bounded receive timeout, all wired through
S3ClientWrapperandS3ScannerConfigwith a short scanning interval. Sorting and comparing the created vs. receivedObjectMetadatavectors is a solid way to validate that prefix filtering and incremental scanning viastart_afterbehave as expected.components/log-ingestor/src/ingestion_job/s3_scanner.rs (3)
12-31: Config and task wiring look consistent with existing ingestion jobs
S3ScannerConfigandTask<S3Client>are minimal and map cleanly to the SQS listener pattern (config + client manager + sender). Thestart_after: Option<String>field plusCloneon the config give you enough flexibility for persisting and testing scanner state, and the channel-basedsenderkeeps the ingestion pipeline decoupled.
33-60: Cancellation integration inrunis soundWrapping the infinite
scanloop in a singletokio::select!overcancel_token.cancelled()correctly ensures that, once cancellation is requested, thescanfuture (and any in‑flightsleepor S3 call) is dropped and the task exits cleanly. Error propagation fromscanis straightforward via theresultbranch.
85-115: Core scan loop,start_afterprogression, and size handling look correct
- Calling
list_objects_v2withbucket,prefix, andset_start_after(self.config.start_after.clone())inside an infinite loop matches the intended “tailing” semantics, and the comments explicitly state the contract that new keys must be lexicographically larger than the last successfully ingested key.- Skipping entries with missing
key/sizeand those ending in/is a reasonable filter for “real” objects; updatingstart_afteronly after a successfulsendprevents you from skipping objects if the channel is closed or back‑pressured enough to error.- Converting
sizeviasize.try_into()?is the right way to bridge the S3i64value into theu64stored inObjectMetadata, avoiding silent truncation for large objects while still failing fast if the SDK ever produced a negative size.Overall, this loop gives you monotonic, at‑least‑once delivery of object metadata under the documented key‑ordering assumption.
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/log-ingestor/src/ingestion_job/s3_scanner.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
🧬 Code graph analysis (1)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (4)
run(52-76)spawn(187-206)shutdown_and_join(219-222)get_id(228-230)
🔇 Additional comments (9)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (9)
1-10: Imports look good.All necessary dependencies are imported for the S3 scanner implementation.
12-19: Configuration structure is well-designed.The
start_afteroptional field appropriately supports resumable scanning.
21-33: Task structure follows established patterns.The generic client manager design matches the SQS listener implementation and supports testability.
85-126: Scanning logic is sound with documented trade-offs.The core scanning implementation correctly:
- Paginates using
start_afterto track progress- Filters directory markers (keys ending with
/)- Updates
start_afterafter each successful send for fault tolerance- Handles truncated results to drain all pages
The immediate continuation on truncated results (lines 117-123) was previously discussed and will cause burst QPS on large prefixes; if this "drain as fast as possible when behind" behaviour is intentional, the current implementation is acceptable per the past review discussion.
128-131: Sleep helper is appropriate.Simple and correct.
134-139: S3Scanner structure follows established patterns.Consistent with the SQS listener design.
141-174: Spawn method correctly initializes the scanner.The implementation properly sets up the task, cancellation token, and spawns the background task, following the established pattern.
176-190: Shutdown method is correctly implemented.The cancellation and join logic matches the SQS listener pattern. Note that the effectiveness depends on the cancellation checking in
Task::run(addressed in separate comment).
192-198: Doc comment wording issue previously flagged.The implementation is correct. The documentation refers to "this scanner" on line 194, which is appropriate.
Note: A previous review comment flagged similar wording inconsistency for this method; that concern has been addressed in the current version.
hoophalab
left a comment
There was a problem hiding this comment.
Sorry, I forgot those two comments.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (2)
49-60: Cancellation is still only checked once; scanner may never stop after shutdown
Task::runracescancel_token.cancelled()againstself.scan(), butscancontains its own infinite loop and never observes the token. Once thescanbranch wins the firstselect!, later calls toS3Scanner::shutdown_and_joinwill cancel the token but the task will keep running until an error occurs, matching the earlier critical feedback.You need to make the scan loop itself cancellation-aware. One way, keeping the current structure, is to pass the token into
scanand useselect!(andis_cancelled) around both the S3 call and the sleep, for example:- pub async fn run(mut self, cancel_token: CancellationToken) -> Result<()> { - select! { - // Cancellation requested. - () = cancel_token.cancelled() => { - Ok(()) - } - - // Scanner execution - result = self.scan() => { - result - } - } - } + pub async fn run(mut self, cancel_token: CancellationToken) -> Result<()> { + self.scan(&cancel_token).await + } @@ - /// # Returns - /// - /// This function never returns unless an error occurs. - /// - /// # Errors - /// - /// Returns an error if: - /// - /// * Forwards [`AwsClientManagerType::get`]'s return values on failure. - /// * Forwards - /// [`aws_sdk_s3::operation::list_objects_v2::builders::ListObjectsV2FluentBuilder::send`]'s - /// return values on failure. - pub async fn scan(&mut self) -> Result<()> { - loop { - let client = self.s3_client_manager.get().await?; - let response = client - .list_objects_v2() - .bucket(self.config.bucket_name.as_str()) - .prefix(self.config.prefix.as_str()) - .set_start_after(self.config.start_after.clone()) - .send() - .await?; - let Some(contents) = response.contents else { - self.sleep().await; - continue; - }; - - for content in contents { - let (Some(key), Some(size)) = (content.key, content.size) else { - continue; - }; - if key.ends_with('/') { - continue; - } - self.sender - .send(ObjectMetadata { - bucket: self.config.bucket_name.clone(), - key: key.clone(), - size: size.try_into()?, - }) - .await?; - self.config.start_after = Some(key); - } - - if response.is_truncated.unwrap_or(false) { - // The results are truncated. Keep going until all objects are listed. - // Ideally, we can use the continuation token to continue listing objects, but since - // we may refresh the client in the next scan cycle, we will use `start_after` to - // send a new request for simplicity. - continue; - } - self.sleep().await; - } - } + /// # Returns + /// + /// This function only returns when cancellation is requested or an error occurs. + pub async fn scan(&mut self, cancel_token: &CancellationToken) -> Result<()> { + loop { + if cancel_token.is_cancelled() { + return Ok(()); + } + + let client = self.s3_client_manager.get().await?; + let list_future = client + .list_objects_v2() + .bucket(self.config.bucket_name.as_str()) + .prefix(self.config.prefix.as_str()) + .set_start_after(self.config.start_after.clone()) + .send(); + + let response = select! { + () = cancel_token.cancelled() => { + return Ok(()); + } + result = list_future => { + result? + } + }; + + let Some(contents) = response.contents else { + // No objects; wait for the next scan or cancellation. + select! { + () = cancel_token.cancelled() => { + return Ok(()); + } + _ = self.sleep() => {} + } + continue; + }; + + for content in contents { + let (Some(key), Some(size)) = (content.key, content.size) else { + continue; + }; + if key.ends_with('/') { + continue; + } + self.sender + .send(ObjectMetadata { + bucket: self.config.bucket_name.clone(), + key: key.clone(), + size: size.try_into()?, + }) + .await?; + self.config.start_after = Some(key); + } + + if response.is_truncated.unwrap_or(false) { + // More pages available; continue immediately, but still honour cancellation at the + // top of the next loop and around the next S3 call. + continue; + } + + // Finished a full scan; sleep unless cancelled. + select! { + () = cancel_token.cancelled() => { + return Ok(()); + } + _ = self.sleep() => {} + } + } + }This keeps the existing semantics (including
start_afterand truncated handling) but ensures the task can exit promptly on cancellation during listing and sleeping, soshutdown_and_joinwill complete reliably.Also applies to: 85-125
62-71: Docstring vs behaviour: truncated listings bypass the configured intervalThe docs for
scansay the scanner “runs in a loop, sleeping for the configured scanning interval between each scan”, but whenresponse.is_truncated.unwrap_or(false)istrueyou immediately loop and issue anotherlist_objects_v2without any delay. That “drain backlog as fast as possible” behaviour is reasonable, but it can generate tight back‑to‑back S3 calls and doesn’t match the current wording.Either (a) explicitly document that truncated pages are processed without delay until the listing is complete, or (b) introduce a small per‑page delay / cap on consecutive truncated pages if you want stricter control over S3 QPS and cost.
Also applies to: 117-124
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (4)
components/clp-rust-utils/src/sqs/event/s3.rs(1 hunks)components/log-ingestor/src/aws_client_manager.rs(3 hunks)components/log-ingestor/src/ingestion_job/s3_scanner.rs(1 hunks)components/log-ingestor/src/ingestion_job/sqs_listener.rs(2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
🧬 Code graph analysis (2)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
spawn(155-174)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (2)
components/log-ingestor/tests/test_ingestion_job.rs (2)
mpsc(180-180)mpsc(252-252)components/log-ingestor/src/ingestion_job/sqs_listener.rs (4)
run(52-76)spawn(187-206)shutdown_and_join(219-222)get_id(228-230)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
- GitHub Check: rust-checks
🔇 Additional comments (5)
components/clp-rust-utils/src/sqs/event/s3.rs (1)
37-37: Verify AWS S3 event notification JSON compatibility—no issues found.The type change from
usizetou64is correct and fully supported:
- AWS S3 event notifications include size as a JSON numeric field (e.g., 1024)
- AWS S3 event examples show size values like 493812 as plain numbers, which JSON deserialization libraries parse directly into u64
- Tests confirm successful deserialization:
record.s3.object.sizecorrectly deserializes to1024- On 32-bit platforms,
usize(32 bits) cannot represent S3 objects >4GB;u64ensures universal compatibility- The change aligns with S3's cross-regional consistency and supports objects up to 5TB
components/log-ingestor/src/aws_client_manager.rs (2)
7-7: CentralizingClone/'staticbounds in the core traits looks goodMoving the
Clonerequirement ontoAwsClientTypeand the'staticbound ontoAwsClientManagerTypesimplifies downstream generics (e.g.,Task/spawnimplementations) without changing behaviour. This matches the intent from the earlier review while still ensuring client managers are safe to move into spawned tasks.Also applies to: 20-20
52-69:S3ClientWrappercleanly mirrorsSqsClientWrapperThe new
S3ClientWrapperis symmetric withSqsClientWrapper, returns cloned clients, and provides a straightforward injection point for tests and other components; no issues spotted.components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
35-36: Removing redundant'staticbounds onTask/spawnkeeps the API tidyNow that
'staticis enforced onAwsClientManagerType, dropping the extra'staticonTaskandSqsListener::spawnkeeps the signatures cleaner without changing the safety or behaviour of the spawned task.Also applies to: 187-187
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
100-115: Checked conversion tou64for object size is a solid improvementUsing
size.try_into()?when buildingObjectMetadataaligns with the newu64size type and avoids silent wrap‑around that a plain cast would introduce; this is a good, defensive choice given the S3 API reports sizes as signed integers.
…to find and ingest newly created objects with a given prefix. (y-scope#1629)
Description
This PR adds
S3Scannerto periodically scan an S3 bucket to find and ingest newly created objects with a given prefix. As the implementation is usinglist_object_v2API withstart_aftermarker, it requires the newly ingested objects' keys to preserve an increasing lexical order.This PR also updates:
test_ingestion_jobto share code logic among SQS listener and S3 scanner.s3::ObjectMetadatato useu64to represent the object's size, as S3 APIs return this value as ani64. In practice it's possible that a file is larger than 4GB. We should use a 64-bit type to be future-proof.Checklist
breaking change.
Validation performed
Summary by CodeRabbit
New Features
Refactor
Tests
✏️ Tip: You can customize this high-level summary in your review settings.