feat(log-ingestor): Add DB operations for ingestion job orchestration:#2014
Conversation
WalkthroughAdds a CLP MySQL-backed S3 ingestion subsystem: new Changes
Sequence DiagramsequenceDiagram
participant Client
participant Connector as ClpDbIngestionConnector
participant DB as MySQL Database
participant State as ClpIngestionState
participant S3SQS as S3/SQS
Client->>Connector: new(db_pool)
activate Connector
Connector->>DB: Initialize tables
DB-->>Connector: OK
Connector-->>Client: Ready
deactivate Connector
Client->>Connector: create_ingestion_job(config: S3IngestionJobConfig)
activate Connector
Connector->>DB: Insert ingestion job (status=Requested)
DB-->>Connector: job_id
alt config == S3Scanner
Connector->>DB: Insert scanner state row
else config == SqsListener
Connector->>DB: Insert listener state row
end
Connector->>State: Construct ClpIngestionState
Connector-->>Client: ClpIngestionState
deactivate Connector
Client->>State: start()
activate State
State->>DB: Update job status -> Running
DB-->>State: OK
State-->>Client: Acknowledged
deactivate State
Client->>State: ingest_s3_object(metadata)
activate State
State->>S3SQS: Acknowledge/process message
State->>DB: Begin transaction
DB-->>State: TX started
State->>DB: Insert metadata row / Update stats
State->>DB: Commit
DB-->>State: TX committed
State-->>Client: Ingestion complete
deactivate State
Client->>State: end()
activate State
State->>DB: Update job status -> Finished
DB-->>State: OK
State-->>Client: Acknowledged
deactivate State
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/Cargo.toml`:
- Around line 32-33: Update the strum dependencies in Cargo.toml: change the
version from 0.28.0 to 0.27.2, remove the separate strum_macros entry and
instead enable the derive feature on the strum crate (features = ["derive"]) so
you only have a single strum = "0.27.2" dependency; then update imports in
clp_ingestion.rs to import derive-provided items from strum (e.g., use
strum::EnumString/EnumVariantNames/Display, etc.) rather than referencing
strum_macros.
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 432-451: The `config` column in
ingestion_job_table_creation_query() is currently defined as VARCHAR(1024) which
can truncate or reject serialized S3IngestionJobConfig JSON; change the column
type to a variable-length text type (e.g., TEXT or VARCHAR(65535)) in the CREATE
TABLE statement for INGESTION_JOB_TABLE_NAME to safely hold longer serialized
configs, and update any related code or migrations that assume a 1024-byte limit
so inserts/updates for S3IngestionJobConfig and other large configs succeed
without truncation.
- Around line 151-186: ingest_s3_object_metadata currently builds one giant
INSERT with 4 × objects.len() placeholders which can exceed MySQL's
65,535-parameter limit; change it to chunk the objects slice into safe-sized
batches (e.g. MAX_PARAMS = 65_535, MAX_PER_BATCH = MAX_PARAMS / 4) and run the
INSERT-building/binding/executing logic inside a loop over chunks so each chunk
produces at most MAX_PER_BATCH rows; keep using the same transaction (tx) and
the same table name constant (INGESTED_S3_OBJECT_METADATA_TABLE_NAME) and ensure
the existing assertion for non-empty input still holds before chunking.
- Around line 119-132: The update_job_status function currently executes an
UPDATE and returns Ok(()) without verifying the result; update it to check the
executed query's rows_affected() and return an error (anyhow::anyhow or a domain
error) when 0 rows were modified (job not found), mirroring the behavior in
update_ingestion_stats_and_commit, and additionally validate the requested
ClpIngestionJobStatus transition (reject invalid transitions such as Finished ->
Running) before executing the SQL; locate update_job_status and add the
post-exec rows_affected() check and a pre-exec transition guard using the job's
current status.
- Around line 66-96: The two INSERTs in create_ingestion_job (the initial insert
that produces job_id and the conditional insert for S3Scanner into
INGESTION_JOB_S3_SCANNER_STATE_TABLE_NAME) must be executed inside a single
database transaction to avoid orphaned ingestion_job rows if the second insert
fails; change create_ingestion_job to begin a transaction from self.db_pool
(sqlx::Transaction), perform the first INSERT via that transaction to obtain
last_insert_id, then if config matches S3IngestionJobConfig::S3Scanner(_) run
the second INSERT on the same transaction, and finally commit the transaction
(propagating errors with ? so rollback happens on failure); keep returning
ClpIngestionState { job_id, db_pool: self.db_pool.clone() } after a successful
commit.
- Around line 456-467: The CREATE TABLE for ingestion_job_s3_scanner_state
defines a foreign key constraint named `ingestion_job_id_ref`, which conflicts
with the identically named constraint in `ingested_s3_object_metadata` and
causes ER_FK_DUP_NAME; update the constraint name in the
`ingestion_job_s3_scanner_state` table (or the other table) to a unique
identifier (e.g., `ingestion_job_s3_scanner_state_job_id_fk`) in the SQL string
built in the function that returns this CREATE TABLE (the block containing
`INGESTION_JOB_S3_SCANNER_STATE_TABLE_NAME`, `last_ingested_key`, and the
`CONSTRAINT` clause) so each table uses a unique foreign key constraint name
while preserving the same referenced table/columns and ON DELETE/ON UPDATE
behavior.
| fn ingestion_job_table_creation_query() -> String { | ||
| format!( | ||
| r" | ||
| CREATE TABLE IF NOT EXISTS `{table}` ( | ||
| `id` BIGINT unsigned NOT NULL AUTO_INCREMENT, | ||
| `config` VARCHAR(1024) NOT NULL, | ||
| `status` {status_enum} NOT NULL DEFAULT '{default_status}', | ||
| `num_files_ingested` BIGINT unsigned NOT NULL DEFAULT '0', | ||
| `num_files_compressed` BIGINT unsigned NOT NULL DEFAULT '0', | ||
| `error_msg` TEXT NULL DEFAULT NULL, | ||
| `creation_ts` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), | ||
| `last_update_ts` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) | ||
| ON UPDATE CURRENT_TIMESTAMP(3), | ||
| PRIMARY KEY (`id`) | ||
| ) ROW_FORMAT=DYNAMIC;", | ||
| table = INGESTION_JOB_TABLE_NAME, | ||
| status_enum = ClpIngestionJobStatus::format_as_sql_enum(), | ||
| default_status = ClpIngestionJobStatus::Requested, | ||
| ) | ||
| } |
There was a problem hiding this comment.
config column as VARCHAR(1024) may be too small for some job configurations.
The S3IngestionJobConfig is serialized as JSON (with serde's externally-tagged enum wrapping) into this column. Configs with long bucket names, key prefixes, queue URLs, and endpoint URLs could approach or exceed 1,024 characters. Consider using TEXT or at least a larger VARCHAR to avoid silent truncation or insert failures.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around
lines 432 - 451, The `config` column in ingestion_job_table_creation_query() is
currently defined as VARCHAR(1024) which can truncate or reject serialized
S3IngestionJobConfig JSON; change the column type to a variable-length text type
(e.g., TEXT or VARCHAR(65535)) in the CREATE TABLE statement for
INGESTION_JOB_TABLE_NAME to safely hold longer serialized configs, and update
any related code or migrations that assume a 1024-byte limit so inserts/updates
for S3IngestionJobConfig and other large configs succeed without truncation.
✅ Addressed in commit cb80fc9
There was a problem hiding this comment.
It should be fine for now.
There was a problem hiding this comment.
@LinZhihao-723, understood! I've noted your decision to keep the current VARCHAR(1024) size for now.
🧠 Learnings used
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 403-426: The enum IngestedS3ObjectMetadataStatus needs the same
manual sqlx::Type<sqlx::MySql> implementation as ClpIngestionJobStatus to map it
to the MySQL string type info; add an impl sqlx::Type<sqlx::MySql> for
IngestedS3ObjectMetadataStatus that returns sqlx::mysql::MySqlTypeInfo::VARCHAR
and sets the string kind (mirroring the ClpIngestionJobStatus impl), keeping
MySqlEnumFormat and existing derives unchanged so future query bindings compile.
- Around line 488-516: The function
ingested_s3_object_metadata_table_creation_query lacks the #[must_use]
annotation present on sibling functions ingestion_job_table_creation_query and
ingestion_job_s3_scanner_state_table_creation_query; add #[must_use] above
ingested_s3_object_metadata_table_creation_query to make its return value
required to be used, keeping the behavior consistent with the other
table-creation helper functions.
- Around line 306-310: The UPDATE_S3_SCANNER_STATE_QUERY execution in
clp_ingestion.rs currently ignores the number of affected rows; modify the block
around
sqlx::query(UPDATE_S3_SCANNER_STATE_QUERY).bind(last_ingested_key).bind(self.job_id).execute(&mut
*tx).await to capture the result, call .rows_affected(), and if it is 0 return
an appropriate error (or propagate a failure) consistent with how
update_job_status and update_ingestion_stats_and_commit handle zero affected
rows so the transaction aborts instead of proceeding when the scanner state row
is missing.
- Around line 173-176: Replace the runtime panic caused by
assert!(!objects.is_empty(), ...) with a recoverable error by returning
anyhow::bail! when objects is empty; update the function that contains this
assertion to return Result (if it doesn't already) so callers like
S3ScannerState::ingest and SqsListenerState::ingest can propagate the error with
?; specifically change the assert on objects to an early check like if
objects.is_empty() { anyhow::bail!("Cannot build S3 object metadata ingestion
query with empty objects"); } and ensure the function signature and callers
handle the anyhow::Error return.
- Around line 497-499: The schema uses compression_job_id INT while
ingestion_job_id is BIGINT unsigned, causing a design inconsistency; change
compression_job_id to BIGINT unsigned (and update its foreign key to
compression_jobs.id) so both job ID columns share the same unsigned 64-bit type;
update any DB migration that creates/modifies compression_job_id, adjust related
Rust structs/types and any SQL bindings or diesel/ORM models that reference
compression_job_id, and run/verify migrations to ensure the FK and types match
compression_jobs.id and ingestion_job_id.
---
Duplicate comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 449-470: The CREATE TABLE SQL in
ingestion_job_table_creation_query currently defines `config` as VARCHAR(1024),
which may truncate serialized S3IngestionJobConfig JSON; change the column type
to TEXT (e.g., `config` TEXT NOT NULL) in the formatted SQL string returned by
ingestion_job_table_creation_query for INGESTION_JOB_TABLE_NAME, leaving the NOT
NULL constraint and other columns unchanged; regenerate any schema/migration
artifacts or unit tests that assert the exact DDL if present so they reflect the
TEXT type, and keep ClpIngestionJobStatus::format_as_sql_enum() and
default_status logic intact.
sitaowang1998
left a comment
There was a problem hiding this comment.
The transaction is not handled correctly. Also some comments on table schema.
| `last_update_ts` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) | ||
| ON UPDATE CURRENT_TIMESTAMP(3), | ||
| PRIMARY KEY (`id`) | ||
| ) ROW_FORMAT=DYNAMIC;", |
There was a problem hiding this comment.
Since dynamic is the default row format https://mariadb.com/docs/server/server-usage/storage-engines/innodb/innodb-row-formats/innodb-dynamic-row-format, I don't think need to explicitly specify this. If the user deploys a MariaDB instance with other settings, we should probably honor that.
| PRIMARY KEY (`id`), | ||
| CONSTRAINT `{INGESTION_JOB_S3_SCANNER_STATE_TABLE_NAME}_ingestion_job_id_ref` | ||
| FOREIGN KEY (`id`) REFERENCES `{INGESTION_JOB_TABLE_NAME}` (`id`) | ||
| ON DELETE RESTRICT ON UPDATE RESTRICT |
There was a problem hiding this comment.
Why restrict the deletion in the job table?
There was a problem hiding this comment.
iiuc, adding "restrict" means the job can't be removed if there are still rows in other tables that reference it. That's probably the behavior we want: we don't want these ingested rows to be accidentally deleted because the job gets removed. Instead, they should be persisted in the DB until they have been ingested.
They plan (not in this release, probably) to clean the ingested metadata by having the GC periodically clean up the ones already cleaned up. And an ingestion job can only be removed if the referenced metadata has been removed by GC, or has been explicitly removed by users.
To achieve this behavior, do you have better suggestions other than using "restrict"?
There was a problem hiding this comment.
As long as the intended behaviour is to make deleting job harder, RESTRICT is a good choice.
| async fn start(&self) -> anyhow::Result<()> { | ||
| self.update_job_status(ClpIngestionJobStatus::Running).await | ||
| } | ||
|
|
||
| /// # Errors | ||
| /// | ||
| /// * Forwards [`Self::update_job_status`]'s return values on failure. | ||
| async fn end(&self) -> anyhow::Result<()> { | ||
| self.update_job_status(ClpIngestionJobStatus::Finished) | ||
| .await | ||
| } |
There was a problem hiding this comment.
Should we check if the job is in correct state before updating it?
There was a problem hiding this comment.
I think it's fine. We should be able to restart or stop a job from any state for now.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 182-193: The batching magic number 10000 in objects.chunks(10000)
should be derived from a named constant to keep it in sync with the number of
bound columns and DB placeholder limits; introduce a constant (e.g.,
PLACEHOLDERS_PER_ROW = 4 and/or CHUNK_SIZE computed from MAX_PLACEHOLDERS /
PLACEHOLDERS_PER_ROW) and replace the literal 10000 with that constant in the
loop that builds query_string (referencing objects.chunks, BASE_INGESTION_QUERY
and the "(?, ?, ?, ?)" placeholder pattern), so future changes to bound columns
or DB limits only require updating the constant calculation.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
components/clp-rust-utils/Cargo.tomlcomponents/clp-rust-utils/src/database/mysql.rscomponents/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
y-scope#2014) * Add SQL statements to create necessary tables for log-ingestor status tracking in CLP DB. * Add support for registering new ingestion jobs in CLP DB. * Add support for ingesting S3 object metadata into CLP DB through implementing ingestion job state traits.
Description
This PR is a part of the implementation for #1978.
This PR implements DB operations for:
At the top level, we introduce
ClpDbIngestionConnectorfor job creation. It returns an ingestion job state implementation,ClpIngestionState, which is a per-job state that enables ingesting S3 object metadata into the CLP DB.The ingestion operation uses transactions to update both the object metadata table and the ingestion job table for consistency.
Checklist
breaking change.
Validation performed
clp_ingestion.rs:finishedstate.test-10000) didn't go through since the job has finished.Summary by CodeRabbit
New Features
Chores