feat(log-ingestor): Add support for recovering log-ingestor on restart (resolves #1978).#2053
Conversation
…github.com/LinZhihao-723/clp into db-state-ingestion-job-manager-integration
|
Important Review skippedReview was skipped due to path filters ⛔ Files ignored due to path filters (1)
CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughAdds a compression wait-and-update helper, makes listener channel closure a graceful completion, and refactors ingestion manager to persist and recover ingestion/compression jobs via new recovery/context structs and DB-backed startup recovery; API signatures and job lifecycle handling were changed accordingly. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Startup
participant Manager as IngestionJobManager
participant Connector as ClpDbIngestionConnector
participant DB as CLP_DB
participant Recovery as LogIngestorRecoveryContext
participant Compression as ClpCompressionJobContext
participant Ingestion as ClpIngestionJobContext
Startup->>Manager: initialize()
Manager->>Connector: connect(clp_config, clp_credentials)
Connector->>DB: create tables, load unfinished/recoverable/inactive jobs
DB-->>Connector: recovery data
Connector-->>Manager: (Connector, Recovery)
Manager->>Recovery: iterate unfinished_compression_jobs
Recovery->>Compression: detach_and_wait_for_completion_and_update_metadata()
Manager->>Recovery: iterate recoverable_ingestion_jobs
Recovery->>Ingestion: recreate job instances, refill buffers
Manager->>Recovery: iterate inactive_ingestion_jobs
Recovery->>Ingestion: refill buffers (no restart)
Manager-->>Startup: ready
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
log-ingestor on restart (#resolves #1978).log-ingestor on restart (resolves #1978).
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)
247-316:⚠️ Potential issue | 🟠 MajorRelease
job_tablelock before awaiting external I/O operations.The lock is held during potentially slow operations (AWS client creation at lines 290 and 307, state transitions at lines 292 and 308), blocking unrelated manager operations like concurrent job removals. This is inconsistent with
shutdown_and_remove_job_instance, which releases the lock immediately after table access before awaiting. Consider structuring as:
- Acquire lock, validate job ID and prefix conflicts
- Release lock
- Perform async client creation and state initialization
- Re-acquire lock, check for race conditions, insert job
Refactor sketch
- let mut job_table = self.inner.job_table.lock().await; - - if job_table.contains_key(&job_id) { - return Err(Error::InternalError(anyhow::anyhow!( - "Job ID collision detected: An ingestion job with ID {job_id} already exists." - ))); - } - - for table_entry in job_table.values() { + { + let job_table = self.inner.job_table.lock().await; + if job_table.contains_key(&job_id) { + return Err(Error::InternalError(anyhow::anyhow!( + "Job ID collision detected: An ingestion job with ID {job_id} already exists." + ))); + } + for table_entry in job_table.values() { // prefix-conflict checks... - } + } + } let ingestion_state = ingestion_job_context.get_ingestion_state(); let ingestion_job_instance = match ingestion_job_context.get_ingestion_job_config().clone() { // awaits... }; + let mut job_table = self.inner.job_table.lock().await; + if job_table.contains_key(&job_id) { + return Err(Error::InternalError(anyhow::anyhow!( + "Job ID collision detected after async setup: {job_id}." + ))); + } job_table.insert( job_id, IngestionJobTableEntry { ingestion_job_instance, ingestion_job_context, }, );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 247 - 316, The job_table lock is held while performing async I/O (SqsClientWrapper::create, S3ClientWrapper::create) and awaiting ingestion_state.start(), which blocks other manager operations; change create_ingestion_job logic to (1) acquire self.inner.job_table.lock().await only to check job_id existence and prefix conflicts (using job_table and existing_job_base_config), then release the lock; (2) perform the async client creation and call ingestion_state.start() outside the lock (handle validation via ValidatedSqsListenerConfig::validate_and_create and S3Scanner setup); (3) re-acquire the lock, verify no race (job_id now absent and prefix still non-conflicting), and insert the constructed IngestionJob (SqsListener or S3Scanner) into job_table; ensure any early errors return before the final insert to avoid leaked started state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 398-410: The recovery loop currently increments num_recovered_jobs
even when create_s3_ingestion_job_instance(...) fails; change the error branch
so it does NOT increment num_recovered_jobs and instead transitions the job to a
failed state to avoid leaving it in requested/running. Specifically, inside the
Err(e) arm for
ingestion_job_manager.create_s3_ingestion_job_instance(ingestion_job_context).await,
call the manager method that marks the job failed (e.g.,
ingestion_job_manager.mark_ingestion_job_failed(job_id, e) or an existing
update_job_status_to_failed(job_id, reason)), keep the tracing::error log, and
only increment num_recovered_jobs in the success path after a successful
creation.
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 360-367: The SQL GROUP BY is missing a non-aggregated column:
update the QUERY constant (the formatted SQL using
INGESTED_S3_OBJECT_METADATA_TABLE_NAME) so the GROUP BY includes both
`compression_job_id` and `ingestion_job_id` (and optionally add ORDER BY
`compression_job_id`, `ingestion_job_id` for deterministic ordering) to satisfy
ONLY_FULL_GROUP_BY; no other code changes to the sqlx::query_as call, the tuple
type (IngestionJobId, CompressionJobId, i64), or the
.bind(IngestedS3ObjectMetadataStatus::Submitted) are needed.
---
Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 247-316: The job_table lock is held while performing async I/O
(SqsClientWrapper::create, S3ClientWrapper::create) and awaiting
ingestion_state.start(), which blocks other manager operations; change
create_ingestion_job logic to (1) acquire self.inner.job_table.lock().await only
to check job_id existence and prefix conflicts (using job_table and
existing_job_base_config), then release the lock; (2) perform the async client
creation and call ingestion_state.start() outside the lock (handle validation
via ValidatedSqsListenerConfig::validate_and_create and S3Scanner setup); (3)
re-acquire the lock, verify no race (job_id now absent and prefix still
non-conflicting), and insert the constructed IngestionJob (SqsListener or
S3Scanner) into job_table; ensure any early errors return before the final
insert to avoid leaked started state.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (4)
components/log-ingestor/src/compression/compression_job_submitter.rscomponents/log-ingestor/src/compression/listener.rscomponents/log-ingestor/src/ingestion_job_manager.rscomponents/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
| let unfinished_compression_jobs = connector.get_unfinished_compression_jobs().await?; | ||
| let (recoverable_ingestion_jobs, inactive_ingestion_jobs) = | ||
| connector.load_ingestion_jobs().await?; |
There was a problem hiding this comment.
I am a little worried about these two functions under failure. If one db connection fails, or config parse fails, we just stop halfway. Should we try our best to handle all jobs and only give up on failed ones?
There was a problem hiding this comment.
I think the current behavior is that if a connection error happens, we return the error directly to the top-level caller. And that would trigger another restart on the log-ingestor to retry everything.
There was a problem hiding this comment.
What if the error is not connection error?
There was a problem hiding this comment.
We only forward fetch_all errors which are from DB operations. Whether these errors are connection errors probably doesn't matter? The next round of retry would be triggered on a restart before the recovery procedure starts.
There was a problem hiding this comment.
I think it is fine for now. Maybe we can distinguish different errors later.
| try_recover_waiting_coroutines_for_unfinished_compression_jobs(unfinished_compression_jobs); | ||
| try_recover_ingestion_job_instances( | ||
| ingestion_job_manager.clone(), | ||
| recoverable_ingestion_jobs, | ||
| ) | ||
| .await; | ||
| try_recover_inactive_ingestion_jobs(inactive_ingestion_jobs).await; |
There was a problem hiding this comment.
Same concern for failing halfway.
There was a problem hiding this comment.
These functions are best-effort. It won't return any errors but only log them and keep proceeding. That's why it's called "try_xxx".
In the future, we plan expose APIs to retry a failed job explicitly.
| "Recovering ingestion job." | ||
| ); | ||
|
|
||
| try_refill_ingestion_buffer(&ingestion_job_context).await; |
There was a problem hiding this comment.
So we continue creating job instance even if refill ingestion buffer fails?
There was a problem hiding this comment.
Yeah, I think it's fair to still allow the ingestion job to ingest more files into the system as:
- These new logs will likely be ingested into the buffer properly for compression.
- As planned for kv-ir, ingesting them into the system would allow early-stage IR-level search.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)
247-326: 🛠️ Refactor suggestion | 🟠 MajorRefactor to release
job_tablelock before async operations.The mutex guard is held across multiple async awaits at lines 290, 292, 307, and 308 (SqsClientWrapper::create, S3ClientWrapper::create, and ingestion_state.start). This serializes manager operations and can cause extended lock stalls during AWS SDK or I/O delays. Restructure to acquire the lock only for validation (lines 249–279), release it, perform async operations independently, then re-acquire before the final table insertion.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 247 - 326, The job_table mutex is held across async awaits causing lock stalls; refactor create_ingestion_job (the block using job_table, ingestion_job_context, and ingestion_job_instance) to only lock job_table for the validation/prefix-checks (the loop that reads existing_job_base_config and the initial job_id collision check), then drop the lock before calling async operations (SqsClientWrapper::create, S3ClientWrapper::create, and ingestion_state.start), build the ingestion_job_instance off-lock, and finally re-acquire the lock only to perform job_table.insert with an IngestionJobTableEntry containing the prepared ingestion_job_instance and ingestion_job_context. Ensure you preserve validation results (e.g., base_config comparison and derived values) so re-acquiring the lock before insert can safely assume no conflicting state was introduced.
♻️ Duplicate comments (2)
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs (1)
361-365:⚠️ Potential issue | 🔴 CriticalFix
GROUP BYto include all non-aggregated selected columns.At Line 362, the query selects
ingestion_job_idbut theGROUP BYat Line 364 only includescompression_job_id. Under strict MySQL mode, this can fail and block startup recovery.Proposed fix
- "SELECT `ingestion_job_id`, `compression_job_id`, COUNT(*) as `num_submitted` FROM \ - `{table}` WHERE `compression_job_id` IS NOT NULL AND `status` = ? GROUP BY \ - `compression_job_id` ORDER BY `compression_job_id` ASC;", + "SELECT `ingestion_job_id`, `compression_job_id`, COUNT(*) as `num_submitted` FROM \ + `{table}` WHERE `compression_job_id` IS NOT NULL AND `status` = ? GROUP BY \ + `ingestion_job_id`, `compression_job_id` ORDER BY `compression_job_id` ASC, \ + `ingestion_job_id` ASC;",#!/bin/bash # Verify the unfinished compression jobs query shape. # Expected after fix: GROUP BY includes both ingestion_job_id and compression_job_id. nl -ba components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs | sed -n '358,370p'🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around lines 361 - 365, The SQL string constant QUERY in the clp ingestion module selects ingestion_job_id and compression_job_id but only groups by compression_job_id; update the GROUP BY clause in the QUERY (the formatcp call that builds the SELECT) to include ingestion_job_id as well (i.e. GROUP BY `ingestion_job_id`, `compression_job_id`) so all non-aggregated selected columns are listed, keeping the rest of the query and the INGESTED_S3_OBJECT_METADATA_TABLE_NAME interpolation unchanged.components/log-ingestor/src/ingestion_job_manager.rs (1)
398-410:⚠️ Potential issue | 🟠 MajorMark failed instance recovery attempts as
failedin DB.At Line 398, the error path only logs and skips. That leaves the ingestion job in
requested/runningwithout a live instance, which creates stale state and repeated retries on restart.Proposed fix
if let Err(e) = ingestion_job_manager .create_s3_ingestion_job_instance(ingestion_job_context) .await { tracing::error!( job_id = ? job_id, error = ? e, "Failed to create ingestion job instance on recovery. Recovery for this job \ skipped." ); + if let Err(fail_err) = ingestion_job_manager + .inner + .clp_db_ingestion_connector + .try_fail(job_id, format!("Failed to recover ingestion job instance: {e}")) + .await + { + tracing::error!( + job_id = ? job_id, + error = ? fail_err, + "Failed to mark ingestion job as failed after recovery failure." + ); + } + continue; } else { num_recovered_jobs += 1; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 398 - 410, The recovery error branch that calls ingestion_job_manager.create_s3_ingestion_job_instance(ingestion_job_context) should mark the job instance as failed in the DB instead of just logging and leaving it in requested/running; on Err(e) call the manager's failure/update method (e.g., add or use a method like ingestion_job_manager.mark_ingestion_job_instance_failed or ingestion_job_manager.update_job_status_to_failed) with job_id and the error details (or create that helper if missing), log the error with the failure update, and ensure you do not leave the stale running/requested state (optionally still increment num_recovered_jobs or account for it consistently after marking failed).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 721-752: get_buffered_object_metadata currently calls fetch_all
and can OOM on large backlogs; change it to perform paginated/chunked reads
using an ordered ID window: repeatedly query
INGESTED_S3_OBJECT_METADATA_TABLE_NAME with "WHERE ingestion_job_id = ? AND
status = ? AND id > ?" plus "ORDER BY id LIMIT ?" (or equivalent), bind
self.job_id, IngestedS3ObjectMetadataStatus::Buffered, a tracked last_id, and a
fixed batch_size (e.g., 500–1000), append each batch's rows into
object_metadata_vec, update last_id to the last row's S3ObjectMetadataId, and
loop until a batch returns empty; implement this in get_buffered_object_metadata
so you avoid fetch_all and still return Vec<ObjectMetadata>.
---
Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 247-326: The job_table mutex is held across async awaits causing
lock stalls; refactor create_ingestion_job (the block using job_table,
ingestion_job_context, and ingestion_job_instance) to only lock job_table for
the validation/prefix-checks (the loop that reads existing_job_base_config and
the initial job_id collision check), then drop the lock before calling async
operations (SqsClientWrapper::create, S3ClientWrapper::create, and
ingestion_state.start), build the ingestion_job_instance off-lock, and finally
re-acquire the lock only to perform job_table.insert with an
IngestionJobTableEntry containing the prepared ingestion_job_instance and
ingestion_job_context. Ensure you preserve validation results (e.g., base_config
comparison and derived values) so re-acquiring the lock before insert can safely
assume no conflicting state was introduced.
---
Duplicate comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 398-410: The recovery error branch that calls
ingestion_job_manager.create_s3_ingestion_job_instance(ingestion_job_context)
should mark the job instance as failed in the DB instead of just logging and
leaving it in requested/running; on Err(e) call the manager's failure/update
method (e.g., add or use a method like
ingestion_job_manager.mark_ingestion_job_instance_failed or
ingestion_job_manager.update_job_status_to_failed) with job_id and the error
details (or create that helper if missing), log the error with the failure
update, and ensure you do not leave the stale running/requested state
(optionally still increment num_recovered_jobs or account for it consistently
after marking failed).
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 361-365: The SQL string constant QUERY in the clp ingestion module
selects ingestion_job_id and compression_job_id but only groups by
compression_job_id; update the GROUP BY clause in the QUERY (the formatcp call
that builds the SELECT) to include ingestion_job_id as well (i.e. GROUP BY
`ingestion_job_id`, `compression_job_id`) so all non-aggregated selected columns
are listed, keeping the rest of the query and the
INGESTED_S3_OBJECT_METADATA_TABLE_NAME interpolation unchanged.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
components/log-ingestor/src/ingestion_job_manager.rscomponents/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)
257-336:⚠️ Potential issue | 🟠 MajorDo not hold the
job_tablemutex across awaited I/O operations.The mutex is acquired at line 257 and held through awaited calls at lines 300, 302, 317, and 318 (AWS client creation and ingestion state startup) before release at line 336. This blocks concurrent job operations under slow AWS/database calls.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 257 - 336, The code currently holds the job_table mutex across awaits (SqsClientWrapper::create, S3ClientWrapper::create, ingestion_state.start) which blocks concurrency; fix by narrowing the locked section: under the lock check for job_id collision and prefix conflicts using ingestion_job_context.get_ingestion_job_config()/as_base_config(), then clone out all needed data (job_id, a clone of ingestion_job_context or its config/base_config, and aws credentials) and drop the lock before performing any awaited I/O (SqsClientWrapper::create, S3ClientWrapper::create, ValidatedSqsListenerConfig::validate_and_create, ingestion_state.start, and spawning the job instance). After creating the IngestionJob (SqsListener or S3Scanner) without holding the mutex, re-acquire job_table.lock().await, re-check that job_id still does not exist (to avoid race), then insert the IngestionJobTableEntry (ingestion_job_instance, ingestion_job_context).
♻️ Duplicate comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)
408-417:⚠️ Potential issue | 🟠 MajorMark unrecoverable instance-creation failures explicitly in DB state.
When recovery cannot recreate an instance, the job is skipped but remains in
requested/running, so it keeps reappearing as recoverable on restart without a terminal reason.💡 Proposed fix
if let Err(e) = ingestion_job_manager .create_s3_ingestion_job_instance(ingestion_job_context) .await { tracing::error!( job_id = ? job_id, error = ? e, "Failed to create ingestion job instance on recovery. Recovery for this job \ skipped." ); + if let Err(fail_err) = ingestion_job_manager + .inner + .clp_db_ingestion_connector + .try_fail(job_id, format!("Failed to recover ingestion job instance: {e}")) + .await + { + tracing::error!( + job_id = ? job_id, + error = ? fail_err, + "Failed to mark ingestion job as failed after recovery failure." + ); + } } else { num_recovered_jobs += 1; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 408 - 417, The recovery path currently logs errors from ingestion_job_manager.create_s3_ingestion_job_instance(...) but leaves the job in requested/running state so it keeps reappearing; update the DB record to mark the job as terminal/unrecoverable when create_s3_ingestion_job_instance returns Err(e) by calling the job-state update method (e.g., the repository/update function used elsewhere to set terminal states) for the given job_id, set a failure status/reason (include the error text) and persist that change before returning from the recovery branch so the job is not retried on restart.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 257-336: The code currently holds the job_table mutex across
awaits (SqsClientWrapper::create, S3ClientWrapper::create,
ingestion_state.start) which blocks concurrency; fix by narrowing the locked
section: under the lock check for job_id collision and prefix conflicts using
ingestion_job_context.get_ingestion_job_config()/as_base_config(), then clone
out all needed data (job_id, a clone of ingestion_job_context or its
config/base_config, and aws credentials) and drop the lock before performing any
awaited I/O (SqsClientWrapper::create, S3ClientWrapper::create,
ValidatedSqsListenerConfig::validate_and_create, ingestion_state.start, and
spawning the job instance). After creating the IngestionJob (SqsListener or
S3Scanner) without holding the mutex, re-acquire job_table.lock().await,
re-check that job_id still does not exist (to avoid race), then insert the
IngestionJobTableEntry (ingestion_job_instance, ingestion_job_context).
---
Duplicate comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 408-417: The recovery path currently logs errors from
ingestion_job_manager.create_s3_ingestion_job_instance(...) but leaves the job
in requested/running state so it keeps reappearing; update the DB record to mark
the job as terminal/unrecoverable when create_s3_ingestion_job_instance returns
Err(e) by calling the job-state update method (e.g., the repository/update
function used elsewhere to set terminal states) for the given job_id, set a
failure status/reason (include the error text) and persist that change before
returning from the recovery branch so the job is not retried on restart.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
components/log-ingestor/src/ingestion_job_manager.rscomponents/log-ingestor/src/ingestion_job_manager/clp_ingestion.rscomponents/log-ingestor/src/routes.rs
Description
This PR is a part of the implementation for #1978, and it is the last PR to implement this feature to resolve #1978.
This PR adds recovery logic for log-ingestor on a restart. The restart procedure is as follows:
To make ingestion job handling generic on creation and on restart, we introduce
ClpIngestionContextand letIngestionJobManageroperate on it:This PR rewrites
IngestionJobManagerpartially to make use ofClpIngestionContext.Checklist
breaking change.
Validation performed
compressedstatus eventually.Summary by CodeRabbit
Bug Fixes
New Features
Chores