Skip to content

feat(log-ingestor): Add DB operations for ingestion job orchestration:#2014

Merged
LinZhihao-723 merged 4 commits into
y-scope:mainfrom
LinZhihao-723:clp-db-connector
Feb 23, 2026
Merged

feat(log-ingestor): Add DB operations for ingestion job orchestration:#2014
LinZhihao-723 merged 4 commits into
y-scope:mainfrom
LinZhihao-723:clp-db-connector

Conversation

@LinZhihao-723

@LinZhihao-723 LinZhihao-723 commented Feb 22, 2026

Copy link
Copy Markdown
Member
  • Add SQL statements to create necessary tables for log-ingestor status tracking in CLP DB.
  • Add support for registering new ingestion jobs in CLP DB.
  • Add support for ingesting S3 object metadata into CLP DB through implementing ingestion job state traits.

Description

This PR is a part of the implementation for #1978.

This PR implements DB operations for:

At the top level, we introduce ClpDbIngestionConnector for job creation. It returns an ingestion job state implementation, ClpIngestionState, which is a per-job state that enables ingesting S3 object metadata into the CLP DB.

The ingestion operation uses transactions to update both the object metadata table and the ingestion job table for consistency.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Ensure all workflows pass.
  • Run the following tests.
    • Step 1: Run this script to start a mariaDB instance on localhost:
      ./start.py --name "clp-db-test" --port 3306 --database "clp-db" --username "ROOT" --password "ROOT"
    • Step 2: Add the following code to the test section in clp_ingestion.rs:
      #[tokio::test]
      async fn test_table_creation() {
          use clp_rust_utils::{
              job_config::ingestion::s3::S3ScannerConfig,
              types::non_empty_string::ExpectedNonEmpty,
          };
          use non_empty_string::NonEmptyString;
          use serde_json::json;
      
          let mysql_options = sqlx::mysql::MySqlConnectOptions::new()
              .host("localhost")
              .port(3306)
              .database("clp-db")
              .username("ROOT")
              .password("ROOT");
      
          let db_pool = sqlx::mysql::MySqlPoolOptions::new()
              .max_connections(100)
              .connect_with(mysql_options)
              .await
              .expect("Failed to connect to mysql");
      
          sqlx::query(
              r"
                  CREATE TABLE IF NOT EXISTS `compression_jobs` (
                      `id` INT NOT NULL AUTO_INCREMENT,
                      `status` INT NOT NULL DEFAULT '0',
                      `status_msg` VARCHAR(512) NOT NULL DEFAULT '',
                      `creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
                      `start_time` DATETIME(3) NULL DEFAULT NULL,
                      `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP(),
                      `duration` FLOAT NULL DEFAULT NULL,
                      `original_size` BIGINT NOT NULL DEFAULT '0',
                      `uncompressed_size` BIGINT NOT NULL DEFAULT '0',
                      `compressed_size` BIGINT NOT NULL DEFAULT '0',
                      `num_tasks` INT NOT NULL DEFAULT '0',
                      `num_tasks_completed` INT NOT NULL DEFAULT '0',
                      `clp_binary_version` INT NULL DEFAULT NULL,
                      `clp_config` VARBINARY(60000) NOT NULL,
                      PRIMARY KEY (`id`) USING BTREE,
                      INDEX `JOB_STATUS` (`status`) USING BTREE,
                      INDEX `JOB_UPDATE_TIME` (`update_time`) USING BTREE
                  ) ROW_FORMAT=DYNAMIC
          ",
          )
              .execute(&db_pool)
              .await
              .expect("Failed to create compression jobs table");
      
          let connector = match ClpDbIngestionConnector::new(db_pool).await {
              Ok(c) => c,
              Err(e) => {
                  panic!("Failed to create CLP DB connector: {e}");
              }
          };
      
          let json_config = json!(
              {
                  "bucket_name": "test",
                  "key_prefix": "test-",
                  "region": "us-east-2",
              }
          );
      
          let config: S3ScannerConfig = serde_json::from_str(json_config.to_string().as_str())
              .expect("Failed to deserialize S3 ingestion job config");
      
          let state = match connector
              .create_ingestion_job(S3IngestionJobConfig::S3Scanner(config))
              .await
          {
              Ok(s) => s,
              Err(e) => {
                  panic!("Failed to create ingestion job: {e}");
              }
          };
      
          state.start().await.expect("Failed to start ingestion job");
      
          let mut objects = Vec::new();
          for i in 0..15000 {
              objects.push(ObjectMetadata {
                  bucket: NonEmptyString::from_static_str("test"),
                  key: NonEmptyString::from_string(format!("test-{i}")),
                  size: 100 + i,
              });
          }
      
          let last_ingested_key = objects.last().unwrap().key.clone();
          S3ScannerState::ingest(&state, objects, last_ingested_key.as_str())
              .await
              .expect("Failed to ingest");
      
          state.end().await.expect("Failed to end ingestion job");
      
          let objects = vec![ObjectMetadata {
              bucket: NonEmptyString::from_static_str("test"),
              key: NonEmptyString::from_static_str("test-10000"),
              size: 8,
          }];
          let last_ingested_key = objects.last().unwrap().key.clone();
          assert!(
              S3ScannerState::ingest(&state, objects, last_ingested_key.as_str())
                  .await
                  .is_err()
          );
      }
    • Step 3: Run the tester (inside the IDE or through the terminal) and make sure:
      • The DB contains the expected tables with the correct schema.
      • One ingestion job is created.
      • 37 files are ingested into the created ingestion job (on both the ingestion job table and S3 object metadata table).
      • The ingestion job ends with a finished state.
      • All above indicates the last file ingestion (test-10000) didn't go through since the job has finished.

Summary by CodeRabbit

  • New Features

    • Enhanced S3 ingestion configuration to support multiple ingestion strategies.
    • Added database-backed ingestion job management for persistent job state, lifecycle control, transactional metadata ingest and status tracking.
  • Chores

    • Updated build dependencies to enable additional derive/macros support.

@LinZhihao-723 LinZhihao-723 requested a review from a team as a code owner February 22, 2026 23:00
@coderabbitai

coderabbitai Bot commented Feb 22, 2026

Copy link
Copy Markdown
Contributor

Walkthrough

Adds a CLP MySQL-backed S3 ingestion subsystem: new S3IngestionJobConfig, MySQL enum formatting trait, a DB-backed ingestion connector and state types, public ingestion IDs/exports, dependency additions, and visibility change for a compression table constant.

Changes

Cohort / File(s) Summary
S3 config
components/clp-rust-utils/src/job_config/ingestion.rs
Added pub enum S3IngestionJobConfig { SqsListener(...), S3Scanner(...) } (Serialize/Deserialize, Clone, Debug).
Ingestion manager public API
components/log-ingestor/src/ingestion_job_manager.rs
Added mod clp_ingestion;, pub type IngestionJobId = u64;, and pub use clp_ingestion::*; to expose new ingestion module types.
DB-backed ingestion implementation
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Large new module: ClpDbIngestionConnector, ClpIngestionState, enums ClpIngestionJobStatus, IngestedS3ObjectMetadataStatus, SQL table-creation queries, transactional metadata ingestion, status/stats updates, trait impls for scanner/listener, and unit tests.
MySQL enum helper
components/clp-rust-utils/src/database/mysql.rs
Added pub trait MySqlEnumFormat (uses IntoEnumIterator) with format_as_sql_enum() default implementation for producing SQL ENUM literals.
Dependencies
components/clp-rust-utils/Cargo.toml, components/log-ingestor/Cargo.toml
Added strum = "0.28.0" to clp-rust-utils; added strum = "0.28.0" and strum_macros = "0.28.0" to log-ingestor for enum iteration/derives.
Compression constant visibility
components/log-ingestor/src/compression/compression_job_submitter.rs
Changed CLP_COMPRESSION_JOB_TABLE_NAME from private to pub for cross-module access.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Connector as ClpDbIngestionConnector
    participant DB as MySQL Database
    participant State as ClpIngestionState
    participant S3SQS as S3/SQS

    Client->>Connector: new(db_pool)
    activate Connector
    Connector->>DB: Initialize tables
    DB-->>Connector: OK
    Connector-->>Client: Ready
    deactivate Connector

    Client->>Connector: create_ingestion_job(config: S3IngestionJobConfig)
    activate Connector
    Connector->>DB: Insert ingestion job (status=Requested)
    DB-->>Connector: job_id
    alt config == S3Scanner
        Connector->>DB: Insert scanner state row
    else config == SqsListener
        Connector->>DB: Insert listener state row
    end
    Connector->>State: Construct ClpIngestionState
    Connector-->>Client: ClpIngestionState
    deactivate Connector

    Client->>State: start()
    activate State
    State->>DB: Update job status -> Running
    DB-->>State: OK
    State-->>Client: Acknowledged
    deactivate State

    Client->>State: ingest_s3_object(metadata)
    activate State
    State->>S3SQS: Acknowledge/process message
    State->>DB: Begin transaction
    DB-->>State: TX started
    State->>DB: Insert metadata row / Update stats
    State->>DB: Commit
    DB-->>State: TX committed
    State-->>Client: Ingestion complete
    deactivate State

    Client->>State: end()
    activate State
    State->>DB: Update job status -> Finished
    DB-->>State: OK
    State-->>Client: Acknowledged
    deactivate State
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: adding database operations for ingestion job orchestration in the log-ingestor component, which aligns with the changeset that introduces ClpDbIngestionConnector and database-backed ingestion state management.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/Cargo.toml`:
- Around line 32-33: Update the strum dependencies in Cargo.toml: change the
version from 0.28.0 to 0.27.2, remove the separate strum_macros entry and
instead enable the derive feature on the strum crate (features = ["derive"]) so
you only have a single strum = "0.27.2" dependency; then update imports in
clp_ingestion.rs to import derive-provided items from strum (e.g., use
strum::EnumString/EnumVariantNames/Display, etc.) rather than referencing
strum_macros.

In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 432-451: The `config` column in
ingestion_job_table_creation_query() is currently defined as VARCHAR(1024) which
can truncate or reject serialized S3IngestionJobConfig JSON; change the column
type to a variable-length text type (e.g., TEXT or VARCHAR(65535)) in the CREATE
TABLE statement for INGESTION_JOB_TABLE_NAME to safely hold longer serialized
configs, and update any related code or migrations that assume a 1024-byte limit
so inserts/updates for S3IngestionJobConfig and other large configs succeed
without truncation.
- Around line 151-186: ingest_s3_object_metadata currently builds one giant
INSERT with 4 × objects.len() placeholders which can exceed MySQL's
65,535-parameter limit; change it to chunk the objects slice into safe-sized
batches (e.g. MAX_PARAMS = 65_535, MAX_PER_BATCH = MAX_PARAMS / 4) and run the
INSERT-building/binding/executing logic inside a loop over chunks so each chunk
produces at most MAX_PER_BATCH rows; keep using the same transaction (tx) and
the same table name constant (INGESTED_S3_OBJECT_METADATA_TABLE_NAME) and ensure
the existing assertion for non-empty input still holds before chunking.
- Around line 119-132: The update_job_status function currently executes an
UPDATE and returns Ok(()) without verifying the result; update it to check the
executed query's rows_affected() and return an error (anyhow::anyhow or a domain
error) when 0 rows were modified (job not found), mirroring the behavior in
update_ingestion_stats_and_commit, and additionally validate the requested
ClpIngestionJobStatus transition (reject invalid transitions such as Finished ->
Running) before executing the SQL; locate update_job_status and add the
post-exec rows_affected() check and a pre-exec transition guard using the job's
current status.
- Around line 66-96: The two INSERTs in create_ingestion_job (the initial insert
that produces job_id and the conditional insert for S3Scanner into
INGESTION_JOB_S3_SCANNER_STATE_TABLE_NAME) must be executed inside a single
database transaction to avoid orphaned ingestion_job rows if the second insert
fails; change create_ingestion_job to begin a transaction from self.db_pool
(sqlx::Transaction), perform the first INSERT via that transaction to obtain
last_insert_id, then if config matches S3IngestionJobConfig::S3Scanner(_) run
the second INSERT on the same transaction, and finally commit the transaction
(propagating errors with ? so rollback happens on failure); keep returning
ClpIngestionState { job_id, db_pool: self.db_pool.clone() } after a successful
commit.
- Around line 456-467: The CREATE TABLE for ingestion_job_s3_scanner_state
defines a foreign key constraint named `ingestion_job_id_ref`, which conflicts
with the identically named constraint in `ingested_s3_object_metadata` and
causes ER_FK_DUP_NAME; update the constraint name in the
`ingestion_job_s3_scanner_state` table (or the other table) to a unique
identifier (e.g., `ingestion_job_s3_scanner_state_job_id_fk`) in the SQL string
built in the function that returns this CREATE TABLE (the block containing
`INGESTION_JOB_S3_SCANNER_STATE_TABLE_NAME`, `last_ingested_key`, and the
`CONSTRAINT` clause) so each table uses a unique foreign key constraint name
while preserving the same referenced table/columns and ON DELETE/ON UPDATE
behavior.

Comment thread components/log-ingestor/Cargo.toml
Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Comment on lines +432 to +451
fn ingestion_job_table_creation_query() -> String {
format!(
r"
CREATE TABLE IF NOT EXISTS `{table}` (
`id` BIGINT unsigned NOT NULL AUTO_INCREMENT,
`config` VARCHAR(1024) NOT NULL,
`status` {status_enum} NOT NULL DEFAULT '{default_status}',
`num_files_ingested` BIGINT unsigned NOT NULL DEFAULT '0',
`num_files_compressed` BIGINT unsigned NOT NULL DEFAULT '0',
`error_msg` TEXT NULL DEFAULT NULL,
`creation_ts` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`last_update_ts` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3)
ON UPDATE CURRENT_TIMESTAMP(3),
PRIMARY KEY (`id`)
) ROW_FORMAT=DYNAMIC;",
table = INGESTION_JOB_TABLE_NAME,
status_enum = ClpIngestionJobStatus::format_as_sql_enum(),
default_status = ClpIngestionJobStatus::Requested,
)
}

@coderabbitai coderabbitai Bot Feb 22, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

config column as VARCHAR(1024) may be too small for some job configurations.

The S3IngestionJobConfig is serialized as JSON (with serde's externally-tagged enum wrapping) into this column. Configs with long bucket names, key prefixes, queue URLs, and endpoint URLs could approach or exceed 1,024 characters. Consider using TEXT or at least a larger VARCHAR to avoid silent truncation or insert failures.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around
lines 432 - 451, The `config` column in ingestion_job_table_creation_query() is
currently defined as VARCHAR(1024) which can truncate or reject serialized
S3IngestionJobConfig JSON; change the column type to a variable-length text type
(e.g., TEXT or VARCHAR(65535)) in the CREATE TABLE statement for
INGESTION_JOB_TABLE_NAME to safely hold longer serialized configs, and update
any related code or migrations that assume a 1024-byte limit so inserts/updates
for S3IngestionJobConfig and other large configs succeed without truncation.

✅ Addressed in commit cb80fc9

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LinZhihao-723, understood! I've noted your decision to keep the current VARCHAR(1024) size for now.


🧠 Learnings used
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.

Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 403-426: The enum IngestedS3ObjectMetadataStatus needs the same
manual sqlx::Type<sqlx::MySql> implementation as ClpIngestionJobStatus to map it
to the MySQL string type info; add an impl sqlx::Type<sqlx::MySql> for
IngestedS3ObjectMetadataStatus that returns sqlx::mysql::MySqlTypeInfo::VARCHAR
and sets the string kind (mirroring the ClpIngestionJobStatus impl), keeping
MySqlEnumFormat and existing derives unchanged so future query bindings compile.
- Around line 488-516: The function
ingested_s3_object_metadata_table_creation_query lacks the #[must_use]
annotation present on sibling functions ingestion_job_table_creation_query and
ingestion_job_s3_scanner_state_table_creation_query; add #[must_use] above
ingested_s3_object_metadata_table_creation_query to make its return value
required to be used, keeping the behavior consistent with the other
table-creation helper functions.
- Around line 306-310: The UPDATE_S3_SCANNER_STATE_QUERY execution in
clp_ingestion.rs currently ignores the number of affected rows; modify the block
around
sqlx::query(UPDATE_S3_SCANNER_STATE_QUERY).bind(last_ingested_key).bind(self.job_id).execute(&mut
*tx).await to capture the result, call .rows_affected(), and if it is 0 return
an appropriate error (or propagate a failure) consistent with how
update_job_status and update_ingestion_stats_and_commit handle zero affected
rows so the transaction aborts instead of proceeding when the scanner state row
is missing.
- Around line 173-176: Replace the runtime panic caused by
assert!(!objects.is_empty(), ...) with a recoverable error by returning
anyhow::bail! when objects is empty; update the function that contains this
assertion to return Result (if it doesn't already) so callers like
S3ScannerState::ingest and SqsListenerState::ingest can propagate the error with
?; specifically change the assert on objects to an early check like if
objects.is_empty() { anyhow::bail!("Cannot build S3 object metadata ingestion
query with empty objects"); } and ensure the function signature and callers
handle the anyhow::Error return.
- Around line 497-499: The schema uses compression_job_id INT while
ingestion_job_id is BIGINT unsigned, causing a design inconsistency; change
compression_job_id to BIGINT unsigned (and update its foreign key to
compression_jobs.id) so both job ID columns share the same unsigned 64-bit type;
update any DB migration that creates/modifies compression_job_id, adjust related
Rust structs/types and any SQL bindings or diesel/ORM models that reference
compression_job_id, and run/verify migrations to ensure the FK and types match
compression_jobs.id and ingestion_job_id.

---

Duplicate comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 449-470: The CREATE TABLE SQL in
ingestion_job_table_creation_query currently defines `config` as VARCHAR(1024),
which may truncate serialized S3IngestionJobConfig JSON; change the column type
to TEXT (e.g., `config` TEXT NOT NULL) in the formatted SQL string returned by
ingestion_job_table_creation_query for INGESTION_JOB_TABLE_NAME, leaving the NOT
NULL constraint and other columns unchanged; regenerate any schema/migration
artifacts or unit tests that assert the exact DDL if present so they reflect the
TEXT type, and keep ClpIngestionJobStatus::format_as_sql_enum() and
default_status logic intact.

Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs

@sitaowang1998 sitaowang1998 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transaction is not handled correctly. Also some comments on table schema.

Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs Outdated
`last_update_ts` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3)
ON UPDATE CURRENT_TIMESTAMP(3),
PRIMARY KEY (`id`)
) ROW_FORMAT=DYNAMIC;",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since dynamic is the default row format https://mariadb.com/docs/server/server-usage/storage-engines/innodb/innodb-row-formats/innodb-dynamic-row-format, I don't think need to explicitly specify this. If the user deploys a MariaDB instance with other settings, we should probably honor that.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

PRIMARY KEY (`id`),
CONSTRAINT `{INGESTION_JOB_S3_SCANNER_STATE_TABLE_NAME}_ingestion_job_id_ref`
FOREIGN KEY (`id`) REFERENCES `{INGESTION_JOB_TABLE_NAME}` (`id`)
ON DELETE RESTRICT ON UPDATE RESTRICT

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why restrict the deletion in the job table?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iiuc, adding "restrict" means the job can't be removed if there are still rows in other tables that reference it. That's probably the behavior we want: we don't want these ingested rows to be accidentally deleted because the job gets removed. Instead, they should be persisted in the DB until they have been ingested.
They plan (not in this release, probably) to clean the ingested metadata by having the GC periodically clean up the ones already cleaned up. And an ingestion job can only be removed if the referenced metadata has been removed by GC, or has been explicitly removed by users.
To achieve this behavior, do you have better suggestions other than using "restrict"?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as the intended behaviour is to make deleting job harder, RESTRICT is a good choice.

Comment on lines +271 to +281
async fn start(&self) -> anyhow::Result<()> {
self.update_job_status(ClpIngestionJobStatus::Running).await
}

/// # Errors
///
/// * Forwards [`Self::update_job_status`]'s return values on failure.
async fn end(&self) -> anyhow::Result<()> {
self.update_job_status(ClpIngestionJobStatus::Finished)
.await
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if the job is in correct state before updating it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine. We should be able to restart or stop a job from any state for now.

Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 182-193: The batching magic number 10000 in objects.chunks(10000)
should be derived from a named constant to keep it in sync with the number of
bound columns and DB placeholder limits; introduce a constant (e.g.,
PLACEHOLDERS_PER_ROW = 4 and/or CHUNK_SIZE computed from MAX_PLACEHOLDERS /
PLACEHOLDERS_PER_ROW) and replace the literal 10000 with that constant in the
loop that builds query_string (referencing objects.chunks, BASE_INGESTION_QUERY
and the "(?, ?, ?, ?)" placeholder pattern), so future changes to bound columns
or DB limits only require updating the constant calculation.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 67dcb17 and cb80fc9.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • components/clp-rust-utils/Cargo.toml
  • components/clp-rust-utils/src/database/mysql.rs
  • components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs

Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
@LinZhihao-723 LinZhihao-723 merged commit 7fb76c3 into y-scope:main Feb 23, 2026
23 checks passed
@LinZhihao-723 LinZhihao-723 deleted the clp-db-connector branch February 23, 2026 20:47
@junhaoliao junhaoliao added this to the February 2026 milestone Feb 26, 2026
junhaoliao pushed a commit to junhaoliao/clp that referenced this pull request May 17, 2026
y-scope#2014)

* Add SQL statements to create necessary tables for log-ingestor status tracking in CLP DB.
* Add support for registering new ingestion jobs in CLP DB.
* Add support for ingesting S3 object metadata into CLP DB through implementing ingestion job state traits.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants