feat(log-ingestor): Add DB operations for compression job submission, result retrieval, and ingestion metadata status updates.#2017
Conversation
WalkthroughAdds optional S3 object metadata IDs and a CompressionJobId type; wires an mpsc channel into ingestion state; centralizes ingestion to assign IDs and forward metadata; adds compression submission and polling methods with DB updates and sqlx type mappings; updates call sites and tests to include the new optional id field. Changes
Sequence DiagramsequenceDiagram
actor Client
participant ClpState as ClpIngestionState
participant DB as Database
participant Compressor as CompressionService
Client->>ClpState: ingest_and_send(objects)
ClpState->>DB: ingest_s3_object_metadata(objects)
DB-->>ClpState: (chunk_size, object_ids)
ClpState->>ClpState: assign ids to ObjectMetadata
ClpState-->>Client: send objects via mpsc channel
Client->>ClpState: submit_for_compression(io_config, objects)
ClpState->>DB: BEGIN / INSERT compression_job
DB-->>ClpState: compression_job_id
ClpState->>DB: UPDATE ingested_s3_object_metadata (compression_job_id, status=Submitted)
ClpState->>DB: COMMIT
ClpState-->>Client: compression_job_id
Client->>ClpState: wait_for_compression_and_update_submitted_metadata(job_id)
loop poll with exponential backoff
ClpState->>Compressor: query_job_status(job_id)
Compressor-->>ClpState: status (Pending|Succeeded|Failed)
end
ClpState->>DB: BEGIN / UPDATE ingested_s3_object_metadata (set status)
ClpState->>DB: UPDATE ingestion_job (maybe increment num_files_compressed)
ClpState->>DB: COMMIT
ClpState-->>Client: (final_status, message)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 126-225: The function submit_for_compression currently allows an
empty objects slice which will insert a compression job but never update any
metadata; add an early guard at the start of submit_for_compression (before
mutating io_config or beginning the transaction) to check if objects.is_empty()
and return a clear anyhow::Error (e.g. anyhow::anyhow!("No objects provided for
compression")) so no compression job is created for empty submissions; update
callers/tests if they rely on panicking behavior.
- Around line 525-529: The loop that sets object.id using next_metadata_id
(inside the objects.chunks_mut(chunk_size).enumerate() block, using
last_inserted_ids and next_metadata_id) assumes MySQL @@auto_increment_increment
== 1; update this by either fetching the server increment (SELECT
@@auto_increment_increment) and using that value to advance next_metadata_id, or
assert/validate that the fetched increment == 1 and add a comment describing the
requirement; place the fetch/validation near the start of the ingestion flow and
use the retrieved increment when doing next_metadata_id += <increment> so the
logic works under non-default auto-increment settings.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (7)
components/clp-rust-utils/src/job_config/compression.rscomponents/clp-rust-utils/src/s3.rscomponents/log-ingestor/src/ingestion_job/s3_scanner.rscomponents/log-ingestor/src/ingestion_job/sqs_listener.rscomponents/log-ingestor/src/ingestion_job_manager/clp_ingestion.rscomponents/log-ingestor/tests/test_compression_listener.rscomponents/log-ingestor/tests/test_ingestion_job.rs
| for (chunk_id, chunk) in objects.chunks_mut(chunk_size).enumerate() { | ||
| let mut next_metadata_id = *last_inserted_ids.get(chunk_id).expect("invalid chunk ID"); | ||
| for object in chunk { | ||
| object.id = Some(next_metadata_id); | ||
| next_metadata_id += 1; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, let's see the file structure and check the specific lines
wc -l "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs"Repository: y-scope/clp
Length of output: 125
🏁 Script executed:
#!/bin/bash
# Read the context around lines 525-529 to understand the code
sed -n '500,550p' "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs"Repository: y-scope/clp
Length of output: 1781
🏁 Script executed:
#!/bin/bash
# Search for where last_inserted_ids is defined/populated
rg "last_inserted_ids" "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs" -B 5 -A 5Repository: y-scope/clp
Length of output: 1758
🏁 Script executed:
#!/bin/bash
# Search for database setup, schema, and configuration files
find . -type f \( -name "*.sql" -o -name "Dockerfile*" -o -name "docker-compose*" -o -name "*mysql*" -o -name "*database*" \) | head -20Repository: y-scope/clp
Length of output: 1316
🏁 Script executed:
#!/bin/bash
# Search for any auto_increment related code or comments
rg "auto_increment|AUTO_INCREMENT" --type-list | head -20Repository: y-scope/clp
Length of output: 444
🏁 Script executed:
#!/bin/bash
# Search for any database initialization or configuration in the codebase
rg "auto_increment|@@auto_increment|replication|multi.?primary" -iRepository: y-scope/clp
Length of output: 1255
🏁 Script executed:
#!/bin/bash
# Check the broader context - what type of metadata is being stored and if ID uniqueness is critical
sed -n '480,530p' "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs"Repository: y-scope/clp
Length of output: 1979
🏁 Script executed:
#!/bin/bash
# Check the database schema setup in the log-ingestor component
rg "CREATE TABLE.*id.*AUTO_INCREMENT" "components/log-ingestor/" -A 5Repository: y-scope/clp
Length of output: 37
🏁 Script executed:
#!/bin/bash
# Look at the database initialization script
find . -name "*init-db*" -o -name "*schema*" | grep -E "(log-ingestor|database)" | head -10Repository: y-scope/clp
Length of output: 37
🏁 Script executed:
#!/bin/bash
# Check the full context of the ingest_s3_object_metadata method to understand the chunk logic
sed -n '450,525p' "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs"Repository: y-scope/clp
Length of output: 2656
🏁 Script executed:
#!/bin/bash
# Search for any documentation about MySQL configuration in this project
find . -name "*.md" -type f | xargs grep -l "auto_increment\|replication\|multi.?primary" 2>/dev/null | head -5Repository: y-scope/clp
Length of output: 37
🏁 Script executed:
#!/bin/bash
# Get the complete ingest_s3_object_metadata method
sed -n '380,450p' "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs"Repository: y-scope/clp
Length of output: 2815
🏁 Script executed:
#!/bin/bash
# Look for the method signature and beginning
rg "fn ingest_s3_object_metadata" "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs" -A 80 | head -100Repository: y-scope/clp
Length of output: 3138
🏁 Script executed:
#!/bin/bash
# Check if there's any multi-primary or replication setup in the deployment configs
rg "auto_increment_increment|server.?id|replication" tools/deployment/ -iRepository: y-scope/clp
Length of output: 126
🏁 Script executed:
#!/bin/bash
# Look at the actual TABLE schema definition in the file
rg "CREATE TABLE.*s3_object_metadata" "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs" -A 10Repository: y-scope/clp
Length of output: 37
🏁 Script executed:
#!/bin/bash
# Check the MySQL initialization in the Helm templates
cat tools/deployment/package-helm/templates/database-statefulset.yaml | grep -A 20 -B 5 "auto_increment\|MYSQL" || echo "No auto_increment config found"Repository: y-scope/clp
Length of output: 2349
🏁 Script executed:
#!/bin/bash
# Check for any MySQL configuration environment variables or init scripts
find tools/deployment/ -type f -name "*.yaml" -o -name "*.sh" | xargs grep -l "MYSQL\|auto_increment" 2>/dev/nullRepository: y-scope/clp
Length of output: 169
🏁 Script executed:
#!/bin/bash
# Search for how MySQL is actually initialized in the project
rg "last_insert_id" --type rust -B 3 -A 3Repository: y-scope/clp
Length of output: 3410
🏁 Script executed:
#!/bin/bash
# Check if there are any comments or documentation about the ID assignment assumption
rg "last_insert_id|auto_increment_increment|sequential.*id|ID assignment" "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs" -B 2 -A 2Repository: y-scope/clp
Length of output: 576
🏁 Script executed:
#!/bin/bash
# Verify the MySQL default for auto_increment_increment
web_search "MySQL auto_increment_increment default value"Repository: y-scope/clp
Length of output: 105
🌐 Web query:
MySQL auto_increment_increment default value single primary
💡 Result:
auto_increment_increment is a MySQL system variable that controls the step size between generated AUTO_INCREMENT values.
- Default value:
1(so values go1, 2, 3, ...) (dev.mysql.com) - This is true for a normal table with a single
AUTO_INCREMENTprimary key unless you (or replication/cluster features) change the variable. (dev.mysql.com) - In Group Replication single-primary mode, MySQL does not modify
auto_increment_increment(it remains at its default unless you changed it). (dev.mysql.com)
Check it:
SELECT @@auto_increment_increment;Set it back to the default for your current session:
SET @@SESSION.auto_increment_increment = 1;Citations:
- 1: https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html?utm_source=openai
- 2: https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html?utm_source=openai
- 3: https://dev.mysql.com/doc/refman/8.0/en/group-replication-system-variables.html?utm_source=openai
🏁 Script executed:
#!/bin/bash
# Check the deployment context - is this single or multi-primary setup?
cat tools/deployment/package/docker-compose-all.yaml | grep -A 30 "mysql\|mariadb" | head -50Repository: y-scope/clp
Length of output: 1901
Document or validate the MySQL auto-increment assumption.
The code assumes @@auto_increment_increment = 1 (default for single-primary) when incrementing next_metadata_id += 1 across chunks. While this works for standard MySQL deployments, it's a fragile assumption that silently breaks if the increment setting changes (e.g., multi-primary replication). Consider either:
- Adding a validation check or assertion that
@@auto_increment_increment = 1 - Documenting this requirement in a comment
- Using the server's actual increment value (the suggested approach above)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around
lines 525 - 529, The loop that sets object.id using next_metadata_id (inside the
objects.chunks_mut(chunk_size).enumerate() block, using last_inserted_ids and
next_metadata_id) assumes MySQL @@auto_increment_increment == 1; update this by
either fetching the server increment (SELECT @@auto_increment_increment) and
using that value to advance next_metadata_id, or assert/validate that the
fetched increment == 1 and add a comment describing the requirement; place the
fetch/validation near the start of the ingestion flow and use the retrieved
increment when doing next_metadata_id += <increment> so the logic works under
non-default auto-increment settings.
There was a problem hiding this comment.
We've explained this assumption in ingest_s3_object_metadata's documentation.
There was a problem hiding this comment.
@LinZhihao-723, thank you for clarifying! I can see that the documentation for ingest_s3_object_metadata states "IDs within a single batch are guaranteed to form a consecutive sequence," which captures this assumption. That's a reasonable approach for documenting the expected behavior in a single-primary MySQL deployment. 👍
🧠 Learnings used
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2014
File: components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs:173-176
Timestamp: 2026-02-23T19:05:17.081Z
Learning: In components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs, ingest_s3_object_metadata uses assert!(!objects.is_empty()) to panic on empty input. This encodes the assumption that empty input represents a programming error that should never occur in practice. During reviews, verify that this invariant is well-documented and that callers should never pass empty collections. If empty input could be a normal, recoverable case, consider returning a Result/Option or using a less drastic check. If the invariant is guaranteed by callers, ensure unit tests cover non-empty input and that panics are appropriate for development/runtime expectations.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2001
File: components/log-ingestor/src/ingestion_job/state.rs:76-81
Timestamp: 2026-02-17T18:54:09.665Z
Learning: In `components/log-ingestor/src/ingestion_job/state.rs`, the `S3ScannerState::ingest` method uses an explicit lifetime `'object_metadata_lifetime` for both the `objects: &[ObjectMetadata]` and `last_ingested_key: &str` parameters. This is intentional for readability, as `last_ingested_key` is a reference to data within one of the `ObjectMetadata` items in the `objects` slice.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2001
File: components/log-ingestor/src/ingestion_job/s3_scanner.rs:132-148
Timestamp: 2026-02-17T03:03:52.020Z
Learning: In the log-ingestor crate, for ingestion flow in the s3_scanner, commit state changes (e.g., call state.ingest() and update start_after) before awaiting sender.send(). This ordering is intentional to align with the system's failure model, ensuring state progress is recorded even if the send fails or is delayed. Apply this pattern to similar ingestion paths in this module to maintain consistency and recoverability.
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 508-543: In ingest_and_send, the call to
self.sender.send(objects).await currently returns an error without any logging;
wrap or handle the Result from self.sender.send so send failures are logged (use
tracing::error) with context such as error = ?err and job_id = ?self.job_id (and
optionally objects.len() or other identifying info) before propagating the
error; update the call site that currently does self.sender.send(objects).await?
to instead inspect_err or match the Result and log then return the error.
| CompressionJobId::try_from(result.last_insert_id()).map_err(|_| { | ||
| anyhow::anyhow!("The retrieved ID overflows: {}", result.last_insert_id()) | ||
| })?; |
There was a problem hiding this comment.
Since the id is auto increment, if a previous id overflows, all later ids are bound to overflow. Maybe we should panic instead?
There was a problem hiding this comment.
We only use panic to catch internal errors (for example, all object metadata passed to the compression should have the ingestion metadata ID set). And when the panic happens, it means something wrong inside the log-ingestor implementation.
However, the overflow here is not a part of log-ingestor's responsibility, so we just return an error and rely on the top-level caller to log the error (and no retry logic).
To fix this in a legit way, I think we should improve the compression job side to use BIGINT unsigned IDs (not INT). And in that case, we should be good to panic here since that overflow will become a really "unrecoverable" issue.
There was a problem hiding this comment.
@CodeRabbit Please open an issue about using BIGINT UNSIGNED for all ids.
There was a problem hiding this comment.
🧩 Analysis chain
I'll create a GitHub issue to track using BIGINT UNSIGNED for all database IDs.
🏁 Script executed:
#!/bin/bash
# Create GitHub issue for standardizing database ID types to BIGINT UNSIGNED
gh issue create \
--title "Use BIGINT UNSIGNED for all database IDs" \
--body "## Context
As discussed in PR `#2017` (https://github.com/y-scope/clp/pull/2017#discussion_r2844641640), we should standardize all database ID types to use \`BIGINT UNSIGNED\` instead of \`INT\` to avoid potential overflow issues.
## Current State
Currently, some database IDs use \`INT\` (e.g., \`CompressionJobId\` which is aliased to \`i32\`), which can lead to overflow issues with auto-increment IDs.
## Proposed Improvement
Update all database ID columns and corresponding Rust type aliases to use \`BIGINT UNSIGNED\` consistently across the codebase. This includes but is not limited to:
- Compression job IDs
- Other auto-incrementing ID columns
## Benefits
- Prevents overflow issues with auto-incrementing IDs
- Allows using panic for overflow cases since they would become truly unrecoverable
- Provides consistency across the database schema
Requested by: `@sitaowang1998`" \
--assignee sitaowang1998
Length of output: 42
@sitaowang1998, I've created issue #2025 to track standardizing all database IDs to use BIGINT UNSIGNED. The issue includes context from this discussion and has been assigned to you.
🧠 Learnings used
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2014
File: components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs:173-176
Timestamp: 2026-02-23T19:05:17.081Z
Learning: In components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs, ingest_s3_object_metadata uses assert!(!objects.is_empty()) to panic on empty input. This encodes the assumption that empty input represents a programming error that should never occur in practice. During reviews, verify that this invariant is well-documented and that callers should never pass empty collections. If empty input could be a normal, recoverable case, consider returning a Result/Option or using a less drastic check. If the invariant is guaranteed by callers, ensure unit tests cover non-empty input and that panics are appropriate for development/runtime expectations.
Learnt from: haiqi96
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-07-29T14:04:13.769Z
Learning: User haiqi96 requested creating a GitHub issue to document a bug fix from PR `#1136`, which addressed MySQL compatibility issues with invalid SQL CAST operations in the WebUI component.
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2001
File: components/log-ingestor/src/ingestion_job/s3_scanner.rs:132-148
Timestamp: 2026-02-17T03:03:52.020Z
Learning: In the log-ingestor crate, for ingestion flow in the s3_scanner, commit state changes (e.g., call state.ingest() and update start_after) before awaiting sender.send(). This ordering is intentional to align with the system's failure model, ensuring state progress is recorded even if the send fails or is delayed. Apply this pattern to similar ingestion paths in this module to maintain consistency and recoverability.
| })?; | ||
|
|
||
| // Update compression job ID for ingested objects | ||
| for chunk in objects.chunks(1000) { |
There was a problem hiding this comment.
- Why this chunk size is just 1000?
- Since we need chunk size more than once, should we create a const function to compute chunk size given the number of query parameters per entry?
There was a problem hiding this comment.
- I chose 1000 'cuz I thought the IDs are in-place values, not placeholders. But I just realized we're actually using placeholders here since IDs are uploaded through
bind. I elevate this number to 10000 with a similar comment. This value can be smaller than the one in ingestion to be conservative, since it's not in a critical path, so no aggressive batching would be needed. - I think it's meaningless to create such a helper if the number of placeholders can't be derived directly from the statement. Also, depends on the use case we may choose the chunk size accordingly like explained in (1). Therefore, I think a hardcoded value here might be ok for now, as long as we clearly document the limitation in the comment.
| impl sqlx::Type<sqlx::MySql> for IngestedS3ObjectMetadataStatus { | ||
| fn type_info() -> <sqlx::MySql as sqlx::Database>::TypeInfo { | ||
| <str as sqlx::Type<sqlx::MySql>>::type_info() | ||
| } | ||
|
|
||
| fn compatible(ty: &<sqlx::MySql as sqlx::Database>::TypeInfo) -> bool { | ||
| <str as sqlx::Type<sqlx::MySql>>::compatible(ty) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
I have seen this code twice in this file. Is there another way to do it with less boilerplate code?
There was a problem hiding this comment.
I think this should be fine, considering it's implementing sqlx::Type trait, where we may need different underlying types (in the enum case it's str) accordingly for sqlx to interpret.
There was a problem hiding this comment.
Can we write a macro for this, where the underlying type is a parameter?
There was a problem hiding this comment.
Added in the latest commit.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 550-603: The wait_for_compression_result loop in
wait_for_compression_result currently can poll forever; add a bounded total wait
and return an error if exceeded. Implement by recording a start time (e.g.,
using std::time::Instant::now()), introduce a MAX_TOTAL_WAIT_SEC constant (or
parameter) and after each sleep/check compare elapsed to the max and return an
anyhow::Error like "compression job timed out" when exceeded; alternatively wrap
the polling loop with tokio::time::timeout. Update references in the function
(sleep_duration_sec, MAX_SLEEP_DURATION_SEC, POLLING_QUERY, the sqlx fetch) to
respect the new timeout behavior so the function returns Err when the total wait
is exceeded instead of looping indefinitely.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
components/clp-rust-utils/src/database/mysql.rscomponents/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
… result retrieval, and ingestion metadata status updates. (y-scope#2017)
Description
This PR is a part of the implementation for #1978.
This PR implements DB operations for compression job submission, result retrieval, and ingestion metadata status updates. Essentially, it's an upgraded implementation for the logic in compression job submitter with the following improvements:
num_files_compressedwill be updated.The plan for the next release is to update
compression_job_submitterimplementation to useClpIngestionStatefor compression job submission and result retrieval.The long-term plan for compression job submission is tracked in #2018.
Checklist
breaking change.
Validation performed
clp_ingestion.rs:finishedstate.compressedacross 5 compression jobs.Summary by CodeRabbit
New Features
Refactor
Tests