Skip to content

feat(log-ingestor): Add S3Scanner to scan a S3 bucket periodically to find and ingest newly created objects with a given prefix.#1629

Merged
LinZhihao-723 merged 9 commits into
y-scope:mainfrom
LinZhihao-723:s3-scanner
Nov 20, 2025

Conversation

@LinZhihao-723

@LinZhihao-723 LinZhihao-723 commented Nov 19, 2025

Copy link
Copy Markdown
Member

Description

This PR adds S3Scanner to periodically scan an S3 bucket to find and ingest newly created objects with a given prefix. As the implementation is using list_object_v2 API with start_after marker, it requires the newly ingested objects' keys to preserve an increasing lexical order.

This PR also updates:

  • test_ingestion_job to share code logic among SQS listener and S3 scanner.
  • s3::ObjectMetadata to use u64 to represent the object's size, as S3 APIs return this value as an i64. In practice it's possible that a file is larger than 4GB. We should use a 64-bit type to be future-proof.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Ensure all workflows pass.
  • Ensure new test cases for S3 scanner passed.

Summary by CodeRabbit

  • New Features

    • Added a periodic S3 prefix scanner with configurable interval, background execution and graceful shutdown.
  • Refactor

    • Switched object size and buffer thresholds to a wider integer type (u64) to better handle large objects.
    • Introduced a public S3 client wrapper and relaxed client lifetime bounds to improve client management.
  • Tests

    • Expanded tests for S3 scanning and ingestion flows, including noise-object scenarios and new test helpers.

✏️ Tip: You can customize this high-level summary in your review settings.

@LinZhihao-723 LinZhihao-723 requested a review from a team as a code owner November 19, 2025 04:40
@coderabbitai

coderabbitai Bot commented Nov 19, 2025

Copy link
Copy Markdown
Contributor

Warning

Rate limit exceeded

@LinZhihao-723 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 9 minutes and 24 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 8844cd9 and 036401d.

📒 Files selected for processing (1)
  • components/log-ingestor/src/ingestion_job/s3_scanner.rs (1 hunks)

Walkthrough

This PR changes several size-related fields from usize to u64, relaxes/adjusts AWS client trait bounds and adds an S3ClientWrapper, introduces a new cancellable S3 prefix scanner module with lifecycle control, and updates tests to cover the new scanner and size-type changes.

Changes

Cohort / File(s) Summary
Type conversions
components/clp-rust-utils/src/s3.rs, components/clp-rust-utils/src/sqs/event/s3.rs, components/log-ingestor/src/compression/buffer.rs, components/log-ingestor/tests/test_compression_listener.rs
Changed size-related fields from usize to u64: ObjectMetadata.size, S3 event Object.size, Buffer.total_size, Buffer.size_threshold, and test constant TEST_OBJECT_SIZE. Updated Buffer::new to accept u64.
AWS client traits & wrapper
components/log-ingestor/src/aws_client_manager.rs
Added Clone bound to AwsClientType, relaxed AwsClientManagerType generic bounds (removed Clone, added 'static), and introduced public S3ClientWrapper (holds S3Client, from ctor, implements AwsClientManagerType<S3Client> with async get() returning a cloned client).
S3 scanning module
components/log-ingestor/src/ingestion_job.rs, components/log-ingestor/src/ingestion_job/s3_scanner.rs
Added s3_scanner submodule and re-export; new public S3ScannerConfig and S3Scanner (spawn, shutdown_and_join, get_id); internal generic Task performing cancellable, paginated S3 prefix scans and emitting ObjectMetadata over an mpsc::Sender.
SQS listener lifetime relaxations
components/log-ingestor/src/ingestion_job/sqs_listener.rs
Removed 'static lifetime requirement from Task impl and SqsListener::spawn generic bounds.
Test infrastructure and validation
components/log-ingestor/tests/test_ingestion_job.rs
Added test helpers (upload_and_receive, upload_noise_objects, get_testing_prefix), use of S3ClientWrapper and S3ScannerConfig, reworked test_sqs_listener, and added test_s3_scanner to validate scanner behaviour.

Sequence Diagram(s)

sequenceDiagram
    participant Test
    participant S3Scanner
    participant Task
    participant S3ClientMgr
    participant AWS_S3
    participant Channel

    Test->>S3Scanner: spawn(id, s3_client_manager, config, sender)
    S3Scanner->>Task: construct Task
    S3Scanner->>+Task: tokio::spawn(Task::run(cancel_token))

    loop scan_loop
        Task->>Task: select(cancel, scan())
        alt cancel received
            Task-->>S3Scanner: exit Ok
        else proceed
            Task->>S3ClientMgr: get()
            S3ClientMgr-->>Task: S3Client (cloned)
            Task->>AWS_S3: list_objects_v2(prefix, start_after)
            AWS_S3-->>Task: contents, is_truncated

            loop each object
                alt key not ending '/'
                    Task->>Channel: send(ObjectMetadata {bucket,key,size})
                end
            end

            alt is_truncated
                Task->>Task: update start_after and continue
            else
                Task->>Task: sleep(scanning_interval)
            end
        end
    end

    Test->>S3Scanner: shutdown_and_join()
    S3Scanner->>Task: cancel_token.cancel()
    Task-->>S3Scanner: join handle result
    S3Scanner-->>Test: return Ok
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

  • Areas needing extra attention:
    • s3_scanner.rs: concurrency, cancellation, pagination, and error propagation.
    • Trait bound changes in aws_client_manager.rs and implications for generic usages.
    • All usizeu64 conversions and arithmetic (buffers, tests, event models).

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: adding S3Scanner functionality to periodically scan S3 buckets for newly created objects with a given prefix.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8c539bb and faff7f2.

📒 Files selected for processing (8)
  • components/clp-rust-utils/src/s3.rs (1 hunks)
  • components/log-ingestor/src/aws_client_manager.rs (1 hunks)
  • components/log-ingestor/src/compression/buffer.rs (2 hunks)
  • components/log-ingestor/src/ingestion_job.rs (1 hunks)
  • components/log-ingestor/src/ingestion_job/s3_scanner.rs (1 hunks)
  • components/log-ingestor/src/ingestion_job/sqs_listener.rs (1 hunks)
  • components/log-ingestor/tests/test_compression_listener.rs (1 hunks)
  • components/log-ingestor/tests/test_ingestion_job.rs (4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
🧬 Code graph analysis (2)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (4)
  • run (52-76)
  • spawn (187-206)
  • shutdown_and_join (219-222)
  • get_id (228-230)
components/log-ingestor/tests/test_ingestion_job.rs (5)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
  • spawn (155-174)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
  • spawn (187-206)
components/log-ingestor/src/aws_client_manager.rs (2)
  • from (47-49)
  • from (66-68)
components/clp-rust-utils/src/sqs/client.rs (1)
  • create_new_client (15-36)
components/log-ingestor/tests/aws_config.rs (1)
  • from_env (47-74)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: rust-checks
🔇 Additional comments (11)
components/clp-rust-utils/src/s3.rs (1)

6-10: Switching ObjectMetadata.size to u64 is a good fit for S3 sizes

Using u64 here aligns with S3’s non-negative, potentially >4 GiB object sizes and with the rest of the ingestion/compression pipeline. Just ensure any on-disk/over-the-wire serialisation that depended on the previous usize layout has been audited for compatibility.

components/log-ingestor/src/aws_client_manager.rs (1)

52-69: S3ClientWrapper mirrors SqsClientWrapper cleanly

The S3 client wrapper is symmetric with SqsClientWrapper and satisfies AwsClientManagerType<S3Client> without adding complexity. This keeps the client-management API consistent across SQS and S3.

components/log-ingestor/tests/test_compression_listener.rs (1)

9-9: Adapting TEST_OBJECT_SIZE to u64 matches the new buffer sizing

Making TEST_OBJECT_SIZE a u64 and passing 120 * TEST_OBJECT_SIZE into Buffer::new keeps the test aligned with ObjectMetadata.size: u64 and Buffer’s u64-based size_threshold without introducing any type-casting surprises.

Also applies to: 69-69

components/log-ingestor/src/ingestion_job.rs (1)

1-5: Publicly re-exporting s3_scanner alongside sqs_listener looks good

Adding the s3_scanner module and re-exporting its items from ingestion_job keeps the public surface coherent—tests can depend on S3ScannerConfig/S3Scanner via the same entry point as the SQS listener.

components/log-ingestor/src/compression/buffer.rs (1)

27-31: Moving buffer sizing to u64 is consistent with S3 object sizes

Using u64 for total_size and size_threshold, and updating Buffer::new accordingly, aligns the buffer with ObjectMetadata.size and avoids architecture-dependent usize truncation. This is a reasonable public API change, assuming all external call sites have been updated as you did in the tests.

Also applies to: 39-45

components/log-ingestor/tests/test_ingestion_job.rs (3)

32-47: Checked conversion in create_s3_objects is appropriate for test-only paths

Using usize::try_from(object.size) with an expect makes the test fail loudly if someone ever feeds an unreasonably large size into the helper, while the current tests stick to tiny payloads (16 bytes). This is a sensible safety net for a test utility.


151-224: Reworked test_sqs_listener exercises end-to-end SQS→S3 flow effectively

Using a per-test UUID prefix, SqsClientWrapper, and the shared upload helpers makes this integration test much clearer and more robust to interference between runs. The channel capacity and 30‑second timeout are reasonable for LocalStack/AWS environments, and filtering via the prefix plus random “noise” keys validates that only relevant objects are surfaced before comparing sorted created vs. received metadata.


226-287: test_s3_scanner gives good coverage of the new scanner behaviour

This test mirrors the SQS case for the scanner: unique per-job prefix, shared helpers for uploads/noise, and a bounded receive timeout, all wired through S3ClientWrapper and S3ScannerConfig with a short scanning interval. Sorting and comparing the created vs. received ObjectMetadata vectors is a solid way to validate that prefix filtering and incremental scanning via start_after behave as expected.

components/log-ingestor/src/ingestion_job/s3_scanner.rs (3)

12-31: Config and task wiring look consistent with existing ingestion jobs

S3ScannerConfig and Task<S3Client> are minimal and map cleanly to the SQS listener pattern (config + client manager + sender). The start_after: Option<String> field plus Clone on the config give you enough flexibility for persisting and testing scanner state, and the channel-based sender keeps the ingestion pipeline decoupled.


33-60: Cancellation integration in run is sound

Wrapping the infinite scan loop in a single tokio::select! over cancel_token.cancelled() correctly ensures that, once cancellation is requested, the scan future (and any in‑flight sleep or S3 call) is dropped and the task exits cleanly. Error propagation from scan is straightforward via the result branch.


85-115: Core scan loop, start_after progression, and size handling look correct

  • Calling list_objects_v2 with bucket, prefix, and set_start_after(self.config.start_after.clone()) inside an infinite loop matches the intended “tailing” semantics, and the comments explicitly state the contract that new keys must be lexicographically larger than the last successfully ingested key.
  • Skipping entries with missing key/size and those ending in / is a reasonable filter for “real” objects; updating start_after only after a successful send prevents you from skipping objects if the channel is closed or back‑pressured enough to error.
  • Converting size via size.try_into()? is the right way to bridge the S3 i64 value into the u64 stored in ObjectMetadata, avoiding silent truncation for large objects while still failing fast if the SDK ever produced a negative size.

Overall, this loop gives you monotonic, at‑least‑once delivery of object metadata under the documented key‑ordering assumption.

Comment thread components/log-ingestor/src/ingestion_job/s3_scanner.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job/s3_scanner.rs
Comment thread components/log-ingestor/src/ingestion_job/sqs_listener.rs
Comment thread components/log-ingestor/tests/test_ingestion_job.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between faff7f2 and 69cbb04.

📒 Files selected for processing (1)
  • components/log-ingestor/src/ingestion_job/s3_scanner.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
🧬 Code graph analysis (1)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (4)
  • run (52-76)
  • spawn (187-206)
  • shutdown_and_join (219-222)
  • get_id (228-230)
🔇 Additional comments (9)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (9)

1-10: Imports look good.

All necessary dependencies are imported for the S3 scanner implementation.


12-19: Configuration structure is well-designed.

The start_after optional field appropriately supports resumable scanning.


21-33: Task structure follows established patterns.

The generic client manager design matches the SQS listener implementation and supports testability.


85-126: Scanning logic is sound with documented trade-offs.

The core scanning implementation correctly:

  • Paginates using start_after to track progress
  • Filters directory markers (keys ending with /)
  • Updates start_after after each successful send for fault tolerance
  • Handles truncated results to drain all pages

The immediate continuation on truncated results (lines 117-123) was previously discussed and will cause burst QPS on large prefixes; if this "drain as fast as possible when behind" behaviour is intentional, the current implementation is acceptable per the past review discussion.


128-131: Sleep helper is appropriate.

Simple and correct.


134-139: S3Scanner structure follows established patterns.

Consistent with the SQS listener design.


141-174: Spawn method correctly initializes the scanner.

The implementation properly sets up the task, cancellation token, and spawns the background task, following the established pattern.


176-190: Shutdown method is correctly implemented.

The cancellation and join logic matches the SQS listener pattern. Note that the effectiveness depends on the cancellation checking in Task::run (addressed in separate comment).


192-198: Doc comment wording issue previously flagged.

The implementation is correct. The documentation refers to "this scanner" on line 194, which is appropriate.

Note: A previous review comment flagged similar wording inconsistency for this method; that concern has been addressed in the current version.

Comment thread components/log-ingestor/src/ingestion_job/s3_scanner.rs
Comment thread components/log-ingestor/tests/test_ingestion_job.rs
Comment thread components/log-ingestor/tests/test_ingestion_job.rs Outdated
Comment thread components/log-ingestor/tests/test_ingestion_job.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job/s3_scanner.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job/s3_scanner.rs Outdated

@hoophalab hoophalab left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I forgot those two comments.

Comment thread components/log-ingestor/src/ingestion_job/s3_scanner.rs Outdated
Comment thread components/clp-rust-utils/src/s3.rs
Comment thread components/log-ingestor/src/ingestion_job/sqs_listener.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (2)

49-60: Cancellation is still only checked once; scanner may never stop after shutdown

Task::run races cancel_token.cancelled() against self.scan(), but scan contains its own infinite loop and never observes the token. Once the scan branch wins the first select!, later calls to S3Scanner::shutdown_and_join will cancel the token but the task will keep running until an error occurs, matching the earlier critical feedback.

You need to make the scan loop itself cancellation-aware. One way, keeping the current structure, is to pass the token into scan and use select! (and is_cancelled) around both the S3 call and the sleep, for example:

-    pub async fn run(mut self, cancel_token: CancellationToken) -> Result<()> {
-        select! {
-            // Cancellation requested.
-            () = cancel_token.cancelled() => {
-                Ok(())
-            }
-
-            // Scanner execution
-            result = self.scan() => {
-                result
-            }
-        }
-    }
+    pub async fn run(mut self, cancel_token: CancellationToken) -> Result<()> {
+        self.scan(&cancel_token).await
+    }
@@
-    /// # Returns
-    ///
-    /// This function never returns unless an error occurs.
-    ///
-    /// # Errors
-    ///
-    /// Returns an error if:
-    ///
-    /// * Forwards [`AwsClientManagerType::get`]'s return values on failure.
-    /// * Forwards
-    ///   [`aws_sdk_s3::operation::list_objects_v2::builders::ListObjectsV2FluentBuilder::send`]'s
-    ///   return values on failure.
-    pub async fn scan(&mut self) -> Result<()> {
-        loop {
-            let client = self.s3_client_manager.get().await?;
-            let response = client
-                .list_objects_v2()
-                .bucket(self.config.bucket_name.as_str())
-                .prefix(self.config.prefix.as_str())
-                .set_start_after(self.config.start_after.clone())
-                .send()
-                .await?;
-            let Some(contents) = response.contents else {
-                self.sleep().await;
-                continue;
-            };
-
-            for content in contents {
-                let (Some(key), Some(size)) = (content.key, content.size) else {
-                    continue;
-                };
-                if key.ends_with('/') {
-                    continue;
-                }
-                self.sender
-                    .send(ObjectMetadata {
-                        bucket: self.config.bucket_name.clone(),
-                        key: key.clone(),
-                        size: size.try_into()?,
-                    })
-                    .await?;
-                self.config.start_after = Some(key);
-            }
-
-            if response.is_truncated.unwrap_or(false) {
-                // The results are truncated. Keep going until all objects are listed.
-                // Ideally, we can use the continuation token to continue listing objects, but since
-                // we may refresh the client in the next scan cycle, we will use `start_after` to
-                // send a new request for simplicity.
-                continue;
-            }
-            self.sleep().await;
-        }
-    }
+    /// # Returns
+    ///
+    /// This function only returns when cancellation is requested or an error occurs.
+    pub async fn scan(&mut self, cancel_token: &CancellationToken) -> Result<()> {
+        loop {
+            if cancel_token.is_cancelled() {
+                return Ok(());
+            }
+
+            let client = self.s3_client_manager.get().await?;
+            let list_future = client
+                .list_objects_v2()
+                .bucket(self.config.bucket_name.as_str())
+                .prefix(self.config.prefix.as_str())
+                .set_start_after(self.config.start_after.clone())
+                .send();
+
+            let response = select! {
+                () = cancel_token.cancelled() => {
+                    return Ok(());
+                }
+                result = list_future => {
+                    result?
+                }
+            };
+
+            let Some(contents) = response.contents else {
+                // No objects; wait for the next scan or cancellation.
+                select! {
+                    () = cancel_token.cancelled() => {
+                        return Ok(());
+                    }
+                    _ = self.sleep() => {}
+                }
+                continue;
+            };
+
+            for content in contents {
+                let (Some(key), Some(size)) = (content.key, content.size) else {
+                    continue;
+                };
+                if key.ends_with('/') {
+                    continue;
+                }
+                self.sender
+                    .send(ObjectMetadata {
+                        bucket: self.config.bucket_name.clone(),
+                        key: key.clone(),
+                        size: size.try_into()?,
+                    })
+                    .await?;
+                self.config.start_after = Some(key);
+            }
+
+            if response.is_truncated.unwrap_or(false) {
+                // More pages available; continue immediately, but still honour cancellation at the
+                // top of the next loop and around the next S3 call.
+                continue;
+            }
+
+            // Finished a full scan; sleep unless cancelled.
+            select! {
+                () = cancel_token.cancelled() => {
+                    return Ok(());
+                }
+                _ = self.sleep() => {}
+            }
+        }
+    }

This keeps the existing semantics (including start_after and truncated handling) but ensures the task can exit promptly on cancellation during listing and sleeping, so shutdown_and_join will complete reliably.

Also applies to: 85-125


62-71: Docstring vs behaviour: truncated listings bypass the configured interval

The docs for scan say the scanner “runs in a loop, sleeping for the configured scanning interval between each scan”, but when response.is_truncated.unwrap_or(false) is true you immediately loop and issue another list_objects_v2 without any delay. That “drain backlog as fast as possible” behaviour is reasonable, but it can generate tight back‑to‑back S3 calls and doesn’t match the current wording.

Either (a) explicitly document that truncated pages are processed without delay until the listing is complete, or (b) introduce a small per‑page delay / cap on consecutive truncated pages if you want stricter control over S3 QPS and cost.

Also applies to: 117-124

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5b7a5d5 and 8844cd9.

📒 Files selected for processing (4)
  • components/clp-rust-utils/src/sqs/event/s3.rs (1 hunks)
  • components/log-ingestor/src/aws_client_manager.rs (3 hunks)
  • components/log-ingestor/src/ingestion_job/s3_scanner.rs (1 hunks)
  • components/log-ingestor/src/ingestion_job/sqs_listener.rs (2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
🧬 Code graph analysis (2)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
  • spawn (155-174)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (2)
components/log-ingestor/tests/test_ingestion_job.rs (2)
  • mpsc (180-180)
  • mpsc (252-252)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (4)
  • run (52-76)
  • spawn (187-206)
  • shutdown_and_join (219-222)
  • get_id (228-230)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: rust-checks
🔇 Additional comments (5)
components/clp-rust-utils/src/sqs/event/s3.rs (1)

37-37: Verify AWS S3 event notification JSON compatibility—no issues found.

The type change from usize to u64 is correct and fully supported:

  • AWS S3 event notifications include size as a JSON numeric field (e.g., 1024)
  • AWS S3 event examples show size values like 493812 as plain numbers, which JSON deserialization libraries parse directly into u64
  • Tests confirm successful deserialization: record.s3.object.size correctly deserializes to 1024
  • On 32-bit platforms, usize (32 bits) cannot represent S3 objects >4GB; u64 ensures universal compatibility
  • The change aligns with S3's cross-regional consistency and supports objects up to 5TB
components/log-ingestor/src/aws_client_manager.rs (2)

7-7: Centralizing Clone/'static bounds in the core traits looks good

Moving the Clone requirement onto AwsClientType and the 'static bound onto AwsClientManagerType simplifies downstream generics (e.g., Task/spawn implementations) without changing behaviour. This matches the intent from the earlier review while still ensuring client managers are safe to move into spawned tasks.

Also applies to: 20-20


52-69: S3ClientWrapper cleanly mirrors SqsClientWrapper

The new S3ClientWrapper is symmetric with SqsClientWrapper, returns cloned clients, and provides a straightforward injection point for tests and other components; no issues spotted.

components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)

35-36: Removing redundant 'static bounds on Task/spawn keeps the API tidy

Now that 'static is enforced on AwsClientManagerType, dropping the extra 'static on Task and SqsListener::spawn keeps the signatures cleaner without changing the safety or behaviour of the spawned task.

Also applies to: 187-187

components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)

100-115: Checked conversion to u64 for object size is a solid improvement

Using size.try_into()? when building ObjectMetadata aligns with the new u64 size type and avoids silent wrap‑around that a plain cast would introduce; this is a good, defensive choice given the S3 API reports sizes as signed integers.

Comment thread components/log-ingestor/src/ingestion_job/s3_scanner.rs
@LinZhihao-723 LinZhihao-723 merged commit b27d77d into y-scope:main Nov 20, 2025
21 checks passed
junhaoliao pushed a commit to junhaoliao/clp that referenced this pull request May 17, 2026
…to find and ingest newly created objects with a given prefix. (y-scope#1629)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants