Skip to content

feat(log-ingestor): Add DB operations for compression job submission, result retrieval, and ingestion metadata status updates.#2017

Merged
LinZhihao-723 merged 4 commits into
y-scope:mainfrom
LinZhihao-723:compression-db-operation
Feb 25, 2026
Merged

Conversation

@LinZhihao-723

@LinZhihao-723 LinZhihao-723 commented Feb 24, 2026

Copy link
Copy Markdown
Member

Description

This PR is a part of the implementation for #1978.

This PR implements DB operations for compression job submission, result retrieval, and ingestion metadata status updates. Essentially, it's an upgraded implementation for the logic in compression job submitter with the following improvements:

  • In a single transaction, a compression job is submitted, and its ID will be assigned to all submitted S3 object metadata.
  • When the compression job completes, its result will be updated to the submitted S3 object metadata. And on a successful compression, the ingestion job's num_files_compressed will be updated.

The plan for the next release is to update compression_job_submitter implementation to use ClpIngestionState for compression job submission and result retrieval.

The long-term plan for compression job submission is tracked in #2018.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Ensure all workflows pass.
  • Run the following tests locally:
    • Step 1: Run this script to start a mariaDB instance on localhost:
      ./start.py --name "clp-db-test" --port 3306 --database "clp-db" --username "ROOT" --password "ROOT"
    • Step 2: Add the following code to the test section in clp_ingestion.rs:
      #[tokio::test]
      async fn test_table_creation() {
          use clp_rust_utils::{
              clp_config::{AwsAuthentication::Credentials, AwsCredentials, S3Config},
              job_config::{S3InputConfig, OutputConfig, ingestion::s3::S3ScannerConfig},
              types::non_empty_string::ExpectedNonEmpty,
          };
          use non_empty_string::NonEmptyString;
          use serde_json::json;
      
          let mysql_options = sqlx::mysql::MySqlConnectOptions::new()
              .host("localhost")
              .port(3306)
              .database("clp-db")
              .username("ROOT")
              .password("ROOT");
      
          let db_pool = sqlx::mysql::MySqlPoolOptions::new()
              .max_connections(100)
              .connect_with(mysql_options)
              .await
              .expect("Failed to connect to mysql");
      
          sqlx::query(
              r"
                  CREATE TABLE IF NOT EXISTS `compression_jobs` (
                      `id` INT NOT NULL AUTO_INCREMENT,
                      `status` INT NOT NULL DEFAULT '0',
                      `status_msg` VARCHAR(512) NOT NULL DEFAULT '',
                      `creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
                      `start_time` DATETIME(3) NULL DEFAULT NULL,
                      `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP(),
                      `duration` FLOAT NULL DEFAULT NULL,
                      `original_size` BIGINT NOT NULL DEFAULT '0',
                      `uncompressed_size` BIGINT NOT NULL DEFAULT '0',
                      `compressed_size` BIGINT NOT NULL DEFAULT '0',
                      `num_tasks` INT NOT NULL DEFAULT '0',
                      `num_tasks_completed` INT NOT NULL DEFAULT '0',
                      `clp_binary_version` INT NULL DEFAULT NULL,
                      `clp_config` VARBINARY(60000) NOT NULL,
                      PRIMARY KEY (`id`) USING BTREE,
                      INDEX `JOB_STATUS` (`status`) USING BTREE,
                      INDEX `JOB_UPDATE_TIME` (`update_time`) USING BTREE
                  ) ROW_FORMAT=DYNAMIC
          ",
          )
              .execute(&db_pool)
              .await
              .expect("Failed to create compression jobs table");
      
          let connector = match ClpDbIngestionConnector::new(db_pool.clone()).await {
              Ok(c) => c,
              Err(e) => {
                  panic!("Failed to create CLP DB connector: {e}");
              }
          };
      
          let json_config = json!(
              {
                  "bucket_name": "test",
                  "key_prefix": "test-",
                  "region": "us-east-2",
              }
          );
      
          let config: S3ScannerConfig = serde_json::from_str(json_config.to_string().as_str())
              .expect("Failed to deserialize S3 ingestion job config");
      
          let (sender, mut receiver) = mpsc::channel::<Vec<ObjectMetadata>>(100);
      
          let state = match connector
              .create_ingestion_job(S3IngestionJobConfig::S3Scanner(config), sender.clone())
              .await
          {
              Ok(s) => s,
              Err(e) => {
                  panic!("Failed to create ingestion job: {e}");
              }
          };
      
          state.start().await.expect("Failed to start ingestion job");
      
          let mut objects = Vec::new();
          for i in 0..12345 {
              objects.push(ObjectMetadata {
                  bucket: NonEmptyString::from_static_str("test"),
                  key: NonEmptyString::from_string(format!("test-{i}")),
                  size: 100 + i,
                  id: None,
              });
          }
      
          let last_ingested_key = objects.last().unwrap().key.clone();
          S3ScannerState::ingest(&state, objects, last_ingested_key.as_str())
              .await
              .expect("Failed to ingest");
      
          state.end().await.expect("Failed to end ingestion job");
      
          let objects = vec![ObjectMetadata {
              bucket: NonEmptyString::from_static_str("test"),
              key: NonEmptyString::from_static_str("test-10000"),
              size: 8,
              id: None,
          }];
          let last_ingested_key = objects.last().unwrap().key.clone();
          assert!(
              S3ScannerState::ingest(&state, objects, last_ingested_key.as_str())
                  .await
                  .is_err()
          );
      
          // Compression
          let objects_to_submit = receiver.recv().await.expect("sth");
          assert_eq!(objects_to_submit.len(), 12345);
      
          for chunk in objects_to_submit.chunks(3000) {
              let s3_input_config = S3InputConfig {
                  s3_config: S3Config {
                      bucket: NonEmptyString::from_static_str("test"),
                      region_code: Some(NonEmptyString::from_static_str("us-east-2")),
                      key_prefix: NonEmptyString::from_static_str("test-"),
                      endpoint_url: None,
                      aws_authentication: Credentials {
                          credentials: AwsCredentials {
                              access_key_id: "ROOT".to_owned(),
                              secret_access_key: "ROOT".to_owned(),
                          },
                      },
                  },
                  keys: None,
                  dataset: Some(NonEmptyString::from_static_str("default")),
                  timestamp_key: None,
                  unstructured: false,
              };
              let output_config = OutputConfig {
                  target_archive_size: 1000,
                  target_dictionaries_size: 1000,
                  target_encoded_file_size: 1000,
                  target_segment_size: 1000,
                  compression_level: 3,
              };
              let io_config_template = ClpIoConfig {
                  input: InputConfig::S3InputConfig {
                      config: s3_input_config,
                  },
                  output: output_config,
              };
      
              let compression_job_id = state
                  .submit_for_compression(io_config_template, &chunk)
                  .await
                  .expect("Failed to submit for compression");
              let state_copy = state.clone();
              let num_objects_submitted = chunk.len();
              let handle = tokio::spawn(async move {
                  state_copy
                      .wait_for_compression_and_update_submitted_metadata(
                          compression_job_id,
                          num_objects_submitted,
                      )
                      .await
                      .expect("Failed to wait for compression result")
              });
      
              // Manually mark the compression as successful
              sqlx::query(
                  format!(
                      r#"UPDATE `{table}` SET `status` = ? WHERE `id` = ?"#,
                      table = CLP_COMPRESSION_JOB_TABLE_NAME
                  )
                      .as_str(),
              )
                  .bind(2)
                  .bind(compression_job_id)
                  .execute(&db_pool)
                  .await
                  .expect("Failed to mark compression job as successful");
      
              let (status, status_msg) = handle.await.expect("Failed to join handle");
              assert_eq!(status, CompressionJobStatus::Succeeded);
          }
      }
    • Step 3: Run the tester (inside the IDE or through the terminal) and make sure:
      • The DB contains the expected tables with the correct schema.
      • One ingestion job is created.
      • 37 files are ingested into the created ingestion job (on both the ingestion job table and S3 object metadata table).
      • The ingestion job ends with a finished state.
      • Five compression jobs are submitted and finished successfully.
      • All metadata rows are marked as compressed across 5 compression jobs.
      • All files are compressed in the ingestion job table while the job status is "finished".

Summary by CodeRabbit

  • New Features

    • Compression job submission and status tracking for ingested S3 objects.
    • Asynchronous metadata processing with channel-based forwarding.
    • Object metadata now includes an optional identifier.
  • Refactor

    • Database indexes adjusted to support ingestion and compression tracking.
    • Compression status polling enhanced with backoff-based wait logic.
  • Tests

    • Test fixtures updated to include the new optional metadata id.

@LinZhihao-723 LinZhihao-723 requested a review from a team as a code owner February 24, 2026 04:48
@coderabbitai

coderabbitai Bot commented Feb 24, 2026

Copy link
Copy Markdown
Contributor

Walkthrough

Adds optional S3 object metadata IDs and a CompressionJobId type; wires an mpsc channel into ingestion state; centralizes ingestion to assign IDs and forward metadata; adds compression submission and polling methods with DB updates and sqlx type mappings; updates call sites and tests to include the new optional id field.

Changes

Cohort / File(s) Summary
Type Definitions
components/clp-rust-utils/src/job_config/compression.rs, components/clp-rust-utils/src/s3.rs
Added pub type CompressionJobId = i32 and pub type S3ObjectMetadataId = u64. Extended ObjectMetadata with pub id: Option<S3ObjectMetadataId>.
Ingestion Call Sites
components/log-ingestor/src/ingestion_job/s3_scanner.rs, components/log-ingestor/src/ingestion_job/sqs_listener.rs
Initialize new ObjectMetadata.id as None when constructing metadata in scanner and SQS parsing paths.
Core Ingestion & Compression Flow
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Added sender: mpsc::Sender<Vec<ObjectMetadata>> to ingestion creation/state. Introduced submit_for_compression, wait_for_compression_and_update_submitted_metadata, and private ingest_and_send. Changed ingest_s3_object_metadata to return (usize, Vec<S3ObjectMetadataId>). Added sqlx type impl macro usages and updated DB index names; switched compression submission serialization to BrotliMsgpack.
Database Helpers
components/clp-rust-utils/src/database/mysql.rs
Added impl_sqlx_type! macro and extended MySqlEnumFormat trait with an iterator bound and format_as_sql_enum() default method.
Tests
components/log-ingestor/tests/test_compression_listener.rs, components/log-ingestor/tests/test_ingestion_job.rs
Updated test object construction to include id: None for ObjectMetadata instances.

Sequence Diagram

sequenceDiagram
    actor Client
    participant ClpState as ClpIngestionState
    participant DB as Database
    participant Compressor as CompressionService

    Client->>ClpState: ingest_and_send(objects)
    ClpState->>DB: ingest_s3_object_metadata(objects)
    DB-->>ClpState: (chunk_size, object_ids)
    ClpState->>ClpState: assign ids to ObjectMetadata
    ClpState-->>Client: send objects via mpsc channel

    Client->>ClpState: submit_for_compression(io_config, objects)
    ClpState->>DB: BEGIN / INSERT compression_job
    DB-->>ClpState: compression_job_id
    ClpState->>DB: UPDATE ingested_s3_object_metadata (compression_job_id, status=Submitted)
    ClpState->>DB: COMMIT
    ClpState-->>Client: compression_job_id

    Client->>ClpState: wait_for_compression_and_update_submitted_metadata(job_id)
    loop poll with exponential backoff
        ClpState->>Compressor: query_job_status(job_id)
        Compressor-->>ClpState: status (Pending|Succeeded|Failed)
    end
    ClpState->>DB: BEGIN / UPDATE ingested_s3_object_metadata (set status)
    ClpState->>DB: UPDATE ingestion_job (maybe increment num_files_compressed)
    ClpState->>DB: COMMIT
    ClpState-->>Client: (final_status, message)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately captures the main purpose of the PR: adding database operations for compression job submission, result retrieval, and ingestion metadata status updates.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 126-225: The function submit_for_compression currently allows an
empty objects slice which will insert a compression job but never update any
metadata; add an early guard at the start of submit_for_compression (before
mutating io_config or beginning the transaction) to check if objects.is_empty()
and return a clear anyhow::Error (e.g. anyhow::anyhow!("No objects provided for
compression")) so no compression job is created for empty submissions; update
callers/tests if they rely on panicking behavior.
- Around line 525-529: The loop that sets object.id using next_metadata_id
(inside the objects.chunks_mut(chunk_size).enumerate() block, using
last_inserted_ids and next_metadata_id) assumes MySQL @@auto_increment_increment
== 1; update this by either fetching the server increment (SELECT
@@auto_increment_increment) and using that value to advance next_metadata_id, or
assert/validate that the fetched increment == 1 and add a comment describing the
requirement; place the fetch/validation near the start of the ingestion flow and
use the retrieved increment when doing next_metadata_id += <increment> so the
logic works under non-default auto-increment settings.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7fb76c3 and 49a97ae.

📒 Files selected for processing (7)
  • components/clp-rust-utils/src/job_config/compression.rs
  • components/clp-rust-utils/src/s3.rs
  • components/log-ingestor/src/ingestion_job/s3_scanner.rs
  • components/log-ingestor/src/ingestion_job/sqs_listener.rs
  • components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
  • components/log-ingestor/tests/test_compression_listener.rs
  • components/log-ingestor/tests/test_ingestion_job.rs

Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Comment on lines +525 to +529
for (chunk_id, chunk) in objects.chunks_mut(chunk_size).enumerate() {
let mut next_metadata_id = *last_inserted_ids.get(chunk_id).expect("invalid chunk ID");
for object in chunk {
object.id = Some(next_metadata_id);
next_metadata_id += 1;

@coderabbitai coderabbitai Bot Feb 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, let's see the file structure and check the specific lines
wc -l "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs"

Repository: y-scope/clp

Length of output: 125


🏁 Script executed:

#!/bin/bash
# Read the context around lines 525-529 to understand the code
sed -n '500,550p' "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs"

Repository: y-scope/clp

Length of output: 1781


🏁 Script executed:

#!/bin/bash
# Search for where last_inserted_ids is defined/populated
rg "last_inserted_ids" "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs" -B 5 -A 5

Repository: y-scope/clp

Length of output: 1758


🏁 Script executed:

#!/bin/bash
# Search for database setup, schema, and configuration files
find . -type f \( -name "*.sql" -o -name "Dockerfile*" -o -name "docker-compose*" -o -name "*mysql*" -o -name "*database*" \) | head -20

Repository: y-scope/clp

Length of output: 1316


🏁 Script executed:

#!/bin/bash
# Search for any auto_increment related code or comments
rg "auto_increment|AUTO_INCREMENT" --type-list | head -20

Repository: y-scope/clp

Length of output: 444


🏁 Script executed:

#!/bin/bash
# Search for any database initialization or configuration in the codebase
rg "auto_increment|@@auto_increment|replication|multi.?primary" -i

Repository: y-scope/clp

Length of output: 1255


🏁 Script executed:

#!/bin/bash
# Check the broader context - what type of metadata is being stored and if ID uniqueness is critical
sed -n '480,530p' "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs"

Repository: y-scope/clp

Length of output: 1979


🏁 Script executed:

#!/bin/bash
# Check the database schema setup in the log-ingestor component
rg "CREATE TABLE.*id.*AUTO_INCREMENT" "components/log-ingestor/" -A 5

Repository: y-scope/clp

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Look at the database initialization script
find . -name "*init-db*" -o -name "*schema*" | grep -E "(log-ingestor|database)" | head -10

Repository: y-scope/clp

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Check the full context of the ingest_s3_object_metadata method to understand the chunk logic
sed -n '450,525p' "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs"

Repository: y-scope/clp

Length of output: 2656


🏁 Script executed:

#!/bin/bash
# Search for any documentation about MySQL configuration in this project
find . -name "*.md" -type f | xargs grep -l "auto_increment\|replication\|multi.?primary" 2>/dev/null | head -5

Repository: y-scope/clp

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Get the complete ingest_s3_object_metadata method
sed -n '380,450p' "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs"

Repository: y-scope/clp

Length of output: 2815


🏁 Script executed:

#!/bin/bash
# Look for the method signature and beginning
rg "fn ingest_s3_object_metadata" "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs" -A 80 | head -100

Repository: y-scope/clp

Length of output: 3138


🏁 Script executed:

#!/bin/bash
# Check if there's any multi-primary or replication setup in the deployment configs
rg "auto_increment_increment|server.?id|replication" tools/deployment/ -i

Repository: y-scope/clp

Length of output: 126


🏁 Script executed:

#!/bin/bash
# Look at the actual TABLE schema definition in the file
rg "CREATE TABLE.*s3_object_metadata" "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs" -A 10

Repository: y-scope/clp

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Check the MySQL initialization in the Helm templates
cat tools/deployment/package-helm/templates/database-statefulset.yaml | grep -A 20 -B 5 "auto_increment\|MYSQL" || echo "No auto_increment config found"

Repository: y-scope/clp

Length of output: 2349


🏁 Script executed:

#!/bin/bash
# Check for any MySQL configuration environment variables or init scripts
find tools/deployment/ -type f -name "*.yaml" -o -name "*.sh" | xargs grep -l "MYSQL\|auto_increment" 2>/dev/null

Repository: y-scope/clp

Length of output: 169


🏁 Script executed:

#!/bin/bash
# Search for how MySQL is actually initialized in the project
rg "last_insert_id" --type rust -B 3 -A 3

Repository: y-scope/clp

Length of output: 3410


🏁 Script executed:

#!/bin/bash
# Check if there are any comments or documentation about the ID assignment assumption
rg "last_insert_id|auto_increment_increment|sequential.*id|ID assignment" "components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs" -B 2 -A 2

Repository: y-scope/clp

Length of output: 576


🏁 Script executed:

#!/bin/bash
# Verify the MySQL default for auto_increment_increment
web_search "MySQL auto_increment_increment default value"

Repository: y-scope/clp

Length of output: 105


🌐 Web query:

MySQL auto_increment_increment default value single primary

💡 Result:

auto_increment_increment is a MySQL system variable that controls the step size between generated AUTO_INCREMENT values.

  • Default value: 1 (so values go 1, 2, 3, ...) (dev.mysql.com)
  • This is true for a normal table with a single AUTO_INCREMENT primary key unless you (or replication/cluster features) change the variable. (dev.mysql.com)
  • In Group Replication single-primary mode, MySQL does not modify auto_increment_increment (it remains at its default unless you changed it). (dev.mysql.com)

Check it:

SELECT @@auto_increment_increment;

Set it back to the default for your current session:

SET @@SESSION.auto_increment_increment = 1;

Citations:


🏁 Script executed:

#!/bin/bash
# Check the deployment context - is this single or multi-primary setup?
cat tools/deployment/package/docker-compose-all.yaml | grep -A 30 "mysql\|mariadb" | head -50

Repository: y-scope/clp

Length of output: 1901


Document or validate the MySQL auto-increment assumption.

The code assumes @@auto_increment_increment = 1 (default for single-primary) when incrementing next_metadata_id += 1 across chunks. While this works for standard MySQL deployments, it's a fragile assumption that silently breaks if the increment setting changes (e.g., multi-primary replication). Consider either:

  • Adding a validation check or assertion that @@auto_increment_increment = 1
  • Documenting this requirement in a comment
  • Using the server's actual increment value (the suggested approach above)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around
lines 525 - 529, The loop that sets object.id using next_metadata_id (inside the
objects.chunks_mut(chunk_size).enumerate() block, using last_inserted_ids and
next_metadata_id) assumes MySQL @@auto_increment_increment == 1; update this by
either fetching the server increment (SELECT @@auto_increment_increment) and
using that value to advance next_metadata_id, or assert/validate that the
fetched increment == 1 and add a comment describing the requirement; place the
fetch/validation near the start of the ingestion flow and use the retrieved
increment when doing next_metadata_id += <increment> so the logic works under
non-default auto-increment settings.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've explained this assumption in ingest_s3_object_metadata's documentation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LinZhihao-723, thank you for clarifying! I can see that the documentation for ingest_s3_object_metadata states "IDs within a single batch are guaranteed to form a consecutive sequence," which captures this assumption. That's a reasonable approach for documenting the expected behavior in a single-primary MySQL deployment. 👍


🧠 Learnings used
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2014
File: components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs:173-176
Timestamp: 2026-02-23T19:05:17.081Z
Learning: In components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs, ingest_s3_object_metadata uses assert!(!objects.is_empty()) to panic on empty input. This encodes the assumption that empty input represents a programming error that should never occur in practice. During reviews, verify that this invariant is well-documented and that callers should never pass empty collections. If empty input could be a normal, recoverable case, consider returning a Result/Option or using a less drastic check. If the invariant is guaranteed by callers, ensure unit tests cover non-empty input and that panics are appropriate for development/runtime expectations.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2001
File: components/log-ingestor/src/ingestion_job/state.rs:76-81
Timestamp: 2026-02-17T18:54:09.665Z
Learning: In `components/log-ingestor/src/ingestion_job/state.rs`, the `S3ScannerState::ingest` method uses an explicit lifetime `'object_metadata_lifetime` for both the `objects: &[ObjectMetadata]` and `last_ingested_key: &str` parameters. This is intentional for readability, as `last_ingested_key` is a reference to data within one of the `ObjectMetadata` items in the `objects` slice.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2001
File: components/log-ingestor/src/ingestion_job/s3_scanner.rs:132-148
Timestamp: 2026-02-17T03:03:52.020Z
Learning: In the log-ingestor crate, for ingestion flow in the s3_scanner, commit state changes (e.g., call state.ingest() and update start_after) before awaiting sender.send(). This ordering is intentional to align with the system's failure model, ensuring state progress is recorded even if the send fails or is delayed. Apply this pattern to similar ingestion paths in this module to maintain consistency and recoverability.

Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 508-543: In ingest_and_send, the call to
self.sender.send(objects).await currently returns an error without any logging;
wrap or handle the Result from self.sender.send so send failures are logged (use
tracing::error) with context such as error = ?err and job_id = ?self.job_id (and
optionally objects.len() or other identifying info) before propagating the
error; update the call site that currently does self.sender.send(objects).await?
to instead inspect_err or match the Result and log then return the error.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 49a97ae and a2897a2.

📒 Files selected for processing (1)
  • components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs

Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Comment on lines +199 to +201
CompressionJobId::try_from(result.last_insert_id()).map_err(|_| {
anyhow::anyhow!("The retrieved ID overflows: {}", result.last_insert_id())
})?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the id is auto increment, if a previous id overflows, all later ids are bound to overflow. Maybe we should panic instead?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only use panic to catch internal errors (for example, all object metadata passed to the compression should have the ingestion metadata ID set). And when the panic happens, it means something wrong inside the log-ingestor implementation.
However, the overflow here is not a part of log-ingestor's responsibility, so we just return an error and rely on the top-level caller to log the error (and no retry logic).
To fix this in a legit way, I think we should improve the compression job side to use BIGINT unsigned IDs (not INT). And in that case, we should be good to panic here since that overflow will become a really "unrecoverable" issue.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodeRabbit Please open an issue about using BIGINT UNSIGNED for all ids.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

I'll create a GitHub issue to track using BIGINT UNSIGNED for all database IDs.


🏁 Script executed:

#!/bin/bash
# Create GitHub issue for standardizing database ID types to BIGINT UNSIGNED
gh issue create \
  --title "Use BIGINT UNSIGNED for all database IDs" \
  --body "## Context

As discussed in PR `#2017` (https://github.com/y-scope/clp/pull/2017#discussion_r2844641640), we should standardize all database ID types to use \`BIGINT UNSIGNED\` instead of \`INT\` to avoid potential overflow issues.

## Current State

Currently, some database IDs use \`INT\` (e.g., \`CompressionJobId\` which is aliased to \`i32\`), which can lead to overflow issues with auto-increment IDs.

## Proposed Improvement

Update all database ID columns and corresponding Rust type aliases to use \`BIGINT UNSIGNED\` consistently across the codebase. This includes but is not limited to:
- Compression job IDs
- Other auto-incrementing ID columns

## Benefits

- Prevents overflow issues with auto-incrementing IDs
- Allows using panic for overflow cases since they would become truly unrecoverable
- Provides consistency across the database schema

Requested by: `@sitaowang1998`" \
  --assignee sitaowang1998

Length of output: 42


@sitaowang1998, I've created issue #2025 to track standardizing all database IDs to use BIGINT UNSIGNED. The issue includes context from this discussion and has been assigned to you.


🧠 Learnings used
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2014
File: components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs:173-176
Timestamp: 2026-02-23T19:05:17.081Z
Learning: In components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs, ingest_s3_object_metadata uses assert!(!objects.is_empty()) to panic on empty input. This encodes the assumption that empty input represents a programming error that should never occur in practice. During reviews, verify that this invariant is well-documented and that callers should never pass empty collections. If empty input could be a normal, recoverable case, consider returning a Result/Option or using a less drastic check. If the invariant is guaranteed by callers, ensure unit tests cover non-empty input and that panics are appropriate for development/runtime expectations.

Learnt from: haiqi96
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-07-29T14:04:13.769Z
Learning: User haiqi96 requested creating a GitHub issue to document a bug fix from PR `#1136`, which addressed MySQL compatibility issues with invalid SQL CAST operations in the WebUI component.

Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2001
File: components/log-ingestor/src/ingestion_job/s3_scanner.rs:132-148
Timestamp: 2026-02-17T03:03:52.020Z
Learning: In the log-ingestor crate, for ingestion flow in the s3_scanner, commit state changes (e.g., call state.ingest() and update start_after) before awaiting sender.send(). This ordering is intentional to align with the system's failure model, ensuring state progress is recorded even if the send fails or is delayed. Apply this pattern to similar ingestion paths in this module to maintain consistency and recoverability.

})?;

// Update compression job ID for ingested objects
for chunk in objects.chunks(1000) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Why this chunk size is just 1000?
  2. Since we need chunk size more than once, should we create a const function to compute chunk size given the number of query parameters per entry?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I chose 1000 'cuz I thought the IDs are in-place values, not placeholders. But I just realized we're actually using placeholders here since IDs are uploaded through bind. I elevate this number to 10000 with a similar comment. This value can be smaller than the one in ingestion to be conservative, since it's not in a critical path, so no aggressive batching would be needed.
  2. I think it's meaningless to create such a helper if the number of placeholders can't be derived directly from the statement. Also, depends on the use case we may choose the chunk size accordingly like explained in (1). Therefore, I think a hardcoded value here might be ok for now, as long as we clearly document the limitation in the comment.

Comment on lines +737 to +746
impl sqlx::Type<sqlx::MySql> for IngestedS3ObjectMetadataStatus {
fn type_info() -> <sqlx::MySql as sqlx::Database>::TypeInfo {
<str as sqlx::Type<sqlx::MySql>>::type_info()
}

fn compatible(ty: &<sqlx::MySql as sqlx::Database>::TypeInfo) -> bool {
<str as sqlx::Type<sqlx::MySql>>::compatible(ty)
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have seen this code twice in this file. Is there another way to do it with less boilerplate code?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be fine, considering it's implementing sqlx::Type trait, where we may need different underlying types (in the enum case it's str) accordingly for sqlx to interpret.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write a macro for this, where the underlying type is a parameter?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in the latest commit.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 550-603: The wait_for_compression_result loop in
wait_for_compression_result currently can poll forever; add a bounded total wait
and return an error if exceeded. Implement by recording a start time (e.g.,
using std::time::Instant::now()), introduce a MAX_TOTAL_WAIT_SEC constant (or
parameter) and after each sleep/check compare elapsed to the max and return an
anyhow::Error like "compression job timed out" when exceeded; alternatively wrap
the polling loop with tokio::time::timeout. Update references in the function
(sleep_duration_sec, MAX_SLEEP_DURATION_SEC, POLLING_QUERY, the sqlx fetch) to
respect the new timeout behavior so the function returns Err when the total wait
is exceeded instead of looping indefinitely.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a2897a2 and a560693.

📒 Files selected for processing (2)
  • components/clp-rust-utils/src/database/mysql.rs
  • components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs

Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
@LinZhihao-723 LinZhihao-723 merged commit d6aad37 into y-scope:main Feb 25, 2026
24 checks passed
@junhaoliao junhaoliao added this to the February 2026 milestone Feb 26, 2026
junhaoliao pushed a commit to junhaoliao/clp that referenced this pull request May 17, 2026
… result retrieval, and ingestion metadata status updates. (y-scope#2017)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants