Skip to content

feat(log-ingestor): Add support for recovering log-ingestor on restart (resolves #1978).#2053

Merged
LinZhihao-723 merged 43 commits into
y-scope:mainfrom
LinZhihao-723:log-ingestor-failure-recovery
Mar 2, 2026
Merged

feat(log-ingestor): Add support for recovering log-ingestor on restart (resolves #1978).#2053
LinZhihao-723 merged 43 commits into
y-scope:mainfrom
LinZhihao-723:log-ingestor-failure-recovery

Conversation

@LinZhihao-723

@LinZhihao-723 LinZhihao-723 commented Mar 2, 2026

Copy link
Copy Markdown
Member

Description

This PR is a part of the implementation for #1978, and it is the last PR to implement this feature to resolve #1978.

This PR adds recovery logic for log-ingestor on a restart. The restart procedure is as follows:

  1. Recover all coroutines that should wait for a compression job to finish and thus update the status of submitted object metadata.
  2. Recover all recoverable ingestion jobs. "Recoverable" here means we should recreate an ingestion job instance.
  3. Recover all inactive ingestion jobs (including paused, failed, and finished ones). This recovery only refills the ingestion buffer, so already-ingested files can be submitted for compression.
Ingestion job type Recoverable Inactive Notes
Waiting for compression to finish Yes Yes By step 1 in general.
Refill ingestion buffer Yes Yes
Create ingestion job insatance Yes No

To make ingestion job handling generic on creation and on restart, we introduce ClpIngestionContext and let IngestionJobManager operate on it:

  • On success, the ingestion manager owns the ingestion context, which holds the listener as a single owner of the ingestion buffer.
  • On failure, the context is dropped, so the listener will be closed. No resource leak.

This PR rewrites IngestionJobManager partially to make use of ClpIngestionContext.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Ensure all workflows pass.
  • Run the following test (for both MySQL and MariaDB):
    1. Start log-ingestor.
    2. Submit 8 SQS listener jobs at once.
    • Ensure they are all running.
    1. When the first compression job is created, restart log-ingestor.
    • Ensure all ingestion jobs are resumed.
    1. Wait until all files are ingested.
    2. Close 4 ingestion jobs.
    3. Restart log-ingestor.
    • Ensure closed jobs are not resumed.
    • Ensure the buffered files for the closed jobs are submitted.
    • Ensure the rest ingestion jobs are resumed.
    1. Ensure all files are in compressed status eventually.
    • This implies the buffer is recovered properly in steps 3 and 6.
    • This implies the waiting coroutines are successfully recovered in steps 3 and 6.
    • This implies that no compression jobs are submitted repeatedly.
  • Start an S3 ingestion job; restart if necessary multiple times during the file ingestion, and ensure all files are eventually ingested without duplication.

Summary by CodeRabbit

  • Bug Fixes

    • Listener channels now close gracefully instead of signaling an unexpected error.
  • New Features

    • Startup recovery restores unfinished compression work, rebuilds in‑progress ingestion jobs, and refills buffered metadata so processing resumes after restarts.
    • Termination responses now report explicit terminal status (Finished/Failed) for clearer shutdown outcomes.
  • Chores

    • Reduced duplicated wait-and-log logic and introduced job/context abstractions to improve lifecycle management and reliability.

LinZhihao-723 and others added 30 commits February 25, 2026 16:30
@LinZhihao-723 LinZhihao-723 requested a review from a team as a code owner March 2, 2026 04:13
@coderabbitai

coderabbitai Bot commented Mar 2, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Review was skipped due to path filters

⛔ Files ignored due to path filters (1)
  • docs/src/_static/generated/log-ingestor-openapi.json is excluded by !**/generated/**

CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including **/dist/** will override the default block on the dist directory, by removing the pattern from both the lists.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review

Walkthrough

Adds a compression wait-and-update helper, makes listener channel closure a graceful completion, and refactors ingestion manager to persist and recover ingestion/compression jobs via new recovery/context structs and DB-backed startup recovery; API signatures and job lifecycle handling were changed accordingly.

Changes

Cohort / File(s) Summary
Compression Job Submission
components/log-ingestor/src/compression/compression_job_submitter.rs
Added wait_for_compression_job_completion_and_update_metadata(state, compression_job_id, num_objects_submitted) async helper; imported CompressionJobId; replaced duplicated inline wait-and-log with the helper and ensured detached coroutine usage.
Listener Channel Handling
components/log-ingestor/src/compression/listener.rs
Changed ListenerTask::run to treat receiver closure as graceful completion: log info and return Ok(()) instead of returning an error.
Ingestion Job Manager
components/log-ingestor/src/ingestion_job_manager.rs
Introduced TerminalStatus enum; replaced direct DB pool creation with ClpDbIngestionConnector::connect; added recovery routines for unfinished compression jobs, recoverable and inactive ingestion jobs; changed create_s3_ingestion_job_instance and shutdown_and_remove_job_instance signatures; updated table entries to store job contexts.
CLP Ingestion & Recovery Contexts
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Added LogIngestorRecoveryContext, ClpCompressionJobContext, ClpIngestionJobContext; ClpDbIngestionConnector::connect now returns recovery context; create_ingestion_job returns ClpIngestionJobContext; added accessors (e.g., get_buffered_object_metadata) and recovery helpers including compression-detach-and-wait flows.
HTTP Routes
components/log-ingestor/src/routes.rs
Removed local TerminalStatus enum and import replaced by TerminalStatus from ingestion_job_manager; terminate_ingestion_job now propagates the external TerminalStatus directly in responses.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    actor Startup
    participant Manager as IngestionJobManager
    participant Connector as ClpDbIngestionConnector
    participant DB as CLP_DB
    participant Recovery as LogIngestorRecoveryContext
    participant Compression as ClpCompressionJobContext
    participant Ingestion as ClpIngestionJobContext

    Startup->>Manager: initialize()
    Manager->>Connector: connect(clp_config, clp_credentials)
    Connector->>DB: create tables, load unfinished/recoverable/inactive jobs
    DB-->>Connector: recovery data
    Connector-->>Manager: (Connector, Recovery)
    Manager->>Recovery: iterate unfinished_compression_jobs
    Recovery->>Compression: detach_and_wait_for_completion_and_update_metadata()
    Manager->>Recovery: iterate recoverable_ingestion_jobs
    Recovery->>Ingestion: recreate job instances, refill buffers
    Manager->>Recovery: iterate inactive_ingestion_jobs
    Recovery->>Ingestion: refill buffers (no restart)
    Manager-->>Startup: ready
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main objective of the PR: adding restart recovery support for log-ingestor and resolving issue #1978.
Linked Issues check ✅ Passed The PR implementation comprehensively addresses core requirements from #1978: recovery of compression job waiters, recoverable ingestion jobs, inactive jobs; new context structures; refactored IngestionJobManager; and proper state persistence flows.
Out of Scope Changes check ✅ Passed All changes are directly aligned with the restart recovery objectives; no out-of-scope modifications detected beyond the stated scope of recovery logic implementation.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@LinZhihao-723 LinZhihao-723 changed the title feat(log-ingestor): Add support for recovering log-ingestor on restart (#resolves #1978). feat(log-ingestor): Add support for recovering log-ingestor on restart (resolves #1978). Mar 2, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)

247-316: ⚠️ Potential issue | 🟠 Major

Release job_table lock before awaiting external I/O operations.

The lock is held during potentially slow operations (AWS client creation at lines 290 and 307, state transitions at lines 292 and 308), blocking unrelated manager operations like concurrent job removals. This is inconsistent with shutdown_and_remove_job_instance, which releases the lock immediately after table access before awaiting. Consider structuring as:

  1. Acquire lock, validate job ID and prefix conflicts
  2. Release lock
  3. Perform async client creation and state initialization
  4. Re-acquire lock, check for race conditions, insert job
Refactor sketch
-        let mut job_table = self.inner.job_table.lock().await;
-
-        if job_table.contains_key(&job_id) {
-            return Err(Error::InternalError(anyhow::anyhow!(
-                "Job ID collision detected: An ingestion job with ID {job_id} already exists."
-            )));
-        }
-
-        for table_entry in job_table.values() {
+        {
+            let job_table = self.inner.job_table.lock().await;
+            if job_table.contains_key(&job_id) {
+                return Err(Error::InternalError(anyhow::anyhow!(
+                    "Job ID collision detected: An ingestion job with ID {job_id} already exists."
+                )));
+            }
+            for table_entry in job_table.values() {
                 // prefix-conflict checks...
-        }
+            }
+        }
 
         let ingestion_state = ingestion_job_context.get_ingestion_state();
         let ingestion_job_instance = match ingestion_job_context.get_ingestion_job_config().clone() {
             // awaits...
         };
 
+        let mut job_table = self.inner.job_table.lock().await;
+        if job_table.contains_key(&job_id) {
+            return Err(Error::InternalError(anyhow::anyhow!(
+                "Job ID collision detected after async setup: {job_id}."
+            )));
+        }
         job_table.insert(
             job_id,
             IngestionJobTableEntry {
                 ingestion_job_instance,
                 ingestion_job_context,
             },
         );
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 247 - 316,
The job_table lock is held while performing async I/O (SqsClientWrapper::create,
S3ClientWrapper::create) and awaiting ingestion_state.start(), which blocks
other manager operations; change create_ingestion_job logic to (1) acquire
self.inner.job_table.lock().await only to check job_id existence and prefix
conflicts (using job_table and existing_job_base_config), then release the lock;
(2) perform the async client creation and call ingestion_state.start() outside
the lock (handle validation via ValidatedSqsListenerConfig::validate_and_create
and S3Scanner setup); (3) re-acquire the lock, verify no race (job_id now absent
and prefix still non-conflicting), and insert the constructed IngestionJob
(SqsListener or S3Scanner) into job_table; ensure any early errors return before
the final insert to avoid leaked started state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 398-410: The recovery loop currently increments num_recovered_jobs
even when create_s3_ingestion_job_instance(...) fails; change the error branch
so it does NOT increment num_recovered_jobs and instead transitions the job to a
failed state to avoid leaving it in requested/running. Specifically, inside the
Err(e) arm for
ingestion_job_manager.create_s3_ingestion_job_instance(ingestion_job_context).await,
call the manager method that marks the job failed (e.g.,
ingestion_job_manager.mark_ingestion_job_failed(job_id, e) or an existing
update_job_status_to_failed(job_id, reason)), keep the tracing::error log, and
only increment num_recovered_jobs in the success path after a successful
creation.

In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 360-367: The SQL GROUP BY is missing a non-aggregated column:
update the QUERY constant (the formatted SQL using
INGESTED_S3_OBJECT_METADATA_TABLE_NAME) so the GROUP BY includes both
`compression_job_id` and `ingestion_job_id` (and optionally add ORDER BY
`compression_job_id`, `ingestion_job_id` for deterministic ordering) to satisfy
ONLY_FULL_GROUP_BY; no other code changes to the sqlx::query_as call, the tuple
type (IngestionJobId, CompressionJobId, i64), or the
.bind(IngestedS3ObjectMetadataStatus::Submitted) are needed.

---

Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 247-316: The job_table lock is held while performing async I/O
(SqsClientWrapper::create, S3ClientWrapper::create) and awaiting
ingestion_state.start(), which blocks other manager operations; change
create_ingestion_job logic to (1) acquire self.inner.job_table.lock().await only
to check job_id existence and prefix conflicts (using job_table and
existing_job_base_config), then release the lock; (2) perform the async client
creation and call ingestion_state.start() outside the lock (handle validation
via ValidatedSqsListenerConfig::validate_and_create and S3Scanner setup); (3)
re-acquire the lock, verify no race (job_id now absent and prefix still
non-conflicting), and insert the constructed IngestionJob (SqsListener or
S3Scanner) into job_table; ensure any early errors return before the final
insert to avoid leaked started state.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 74bf5f6 and 06c56a7.

📒 Files selected for processing (4)
  • components/log-ingestor/src/compression/compression_job_submitter.rs
  • components/log-ingestor/src/compression/listener.rs
  • components/log-ingestor/src/ingestion_job_manager.rs
  • components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs

Comment thread components/log-ingestor/src/ingestion_job_manager.rs
Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs Outdated
Comment on lines +233 to +235
let unfinished_compression_jobs = connector.get_unfinished_compression_jobs().await?;
let (recoverable_ingestion_jobs, inactive_ingestion_jobs) =
connector.load_ingestion_jobs().await?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little worried about these two functions under failure. If one db connection fails, or config parse fails, we just stop halfway. Should we try our best to handle all jobs and only give up on failed ones?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current behavior is that if a connection error happens, we return the error directly to the top-level caller. And that would trigger another restart on the log-ingestor to retry everything.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the error is not connection error?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only forward fetch_all errors which are from DB operations. Whether these errors are connection errors probably doesn't matter? The next round of retry would be triggered on a restart before the recovery procedure starts.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine for now. Maybe we can distinguish different errors later.

Comment on lines +102 to +108
try_recover_waiting_coroutines_for_unfinished_compression_jobs(unfinished_compression_jobs);
try_recover_ingestion_job_instances(
ingestion_job_manager.clone(),
recoverable_ingestion_jobs,
)
.await;
try_recover_inactive_ingestion_jobs(inactive_ingestion_jobs).await;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concern for failing halfway.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These functions are best-effort. It won't return any errors but only log them and keep proceeding. That's why it's called "try_xxx".
In the future, we plan expose APIs to retry a failed job explicitly.

"Recovering ingestion job."
);

try_refill_ingestion_buffer(&ingestion_job_context).await;

@sitaowang1998 sitaowang1998 Mar 2, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we continue creating job instance even if refill ingestion buffer fails?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's fair to still allow the ingestion job to ingest more files into the system as:

  • These new logs will likely be ingested into the buffer properly for compression.
  • As planned for kv-ir, ingesting them into the system would allow early-stage IR-level search.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)

247-326: 🛠️ Refactor suggestion | 🟠 Major

Refactor to release job_table lock before async operations.

The mutex guard is held across multiple async awaits at lines 290, 292, 307, and 308 (SqsClientWrapper::create, S3ClientWrapper::create, and ingestion_state.start). This serializes manager operations and can cause extended lock stalls during AWS SDK or I/O delays. Restructure to acquire the lock only for validation (lines 249–279), release it, perform async operations independently, then re-acquire before the final table insertion.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 247 - 326,
The job_table mutex is held across async awaits causing lock stalls; refactor
create_ingestion_job (the block using job_table, ingestion_job_context, and
ingestion_job_instance) to only lock job_table for the validation/prefix-checks
(the loop that reads existing_job_base_config and the initial job_id collision
check), then drop the lock before calling async operations
(SqsClientWrapper::create, S3ClientWrapper::create, and ingestion_state.start),
build the ingestion_job_instance off-lock, and finally re-acquire the lock only
to perform job_table.insert with an IngestionJobTableEntry containing the
prepared ingestion_job_instance and ingestion_job_context. Ensure you preserve
validation results (e.g., base_config comparison and derived values) so
re-acquiring the lock before insert can safely assume no conflicting state was
introduced.
♻️ Duplicate comments (2)
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs (1)

361-365: ⚠️ Potential issue | 🔴 Critical

Fix GROUP BY to include all non-aggregated selected columns.

At Line 362, the query selects ingestion_job_id but the GROUP BY at Line 364 only includes compression_job_id. Under strict MySQL mode, this can fail and block startup recovery.

Proposed fix
-            "SELECT `ingestion_job_id`, `compression_job_id`, COUNT(*) as `num_submitted` FROM \
-             `{table}` WHERE `compression_job_id` IS NOT NULL AND `status` = ? GROUP BY \
-             `compression_job_id` ORDER BY `compression_job_id` ASC;",
+            "SELECT `ingestion_job_id`, `compression_job_id`, COUNT(*) as `num_submitted` FROM \
+             `{table}` WHERE `compression_job_id` IS NOT NULL AND `status` = ? GROUP BY \
+             `ingestion_job_id`, `compression_job_id` ORDER BY `compression_job_id` ASC, \
+             `ingestion_job_id` ASC;",
#!/bin/bash
# Verify the unfinished compression jobs query shape.
# Expected after fix: GROUP BY includes both ingestion_job_id and compression_job_id.
nl -ba components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs | sed -n '358,370p'
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around
lines 361 - 365, The SQL string constant QUERY in the clp ingestion module
selects ingestion_job_id and compression_job_id but only groups by
compression_job_id; update the GROUP BY clause in the QUERY (the formatcp call
that builds the SELECT) to include ingestion_job_id as well (i.e. GROUP BY
`ingestion_job_id`, `compression_job_id`) so all non-aggregated selected columns
are listed, keeping the rest of the query and the
INGESTED_S3_OBJECT_METADATA_TABLE_NAME interpolation unchanged.
components/log-ingestor/src/ingestion_job_manager.rs (1)

398-410: ⚠️ Potential issue | 🟠 Major

Mark failed instance recovery attempts as failed in DB.

At Line 398, the error path only logs and skips. That leaves the ingestion job in requested/running without a live instance, which creates stale state and repeated retries on restart.

Proposed fix
         if let Err(e) = ingestion_job_manager
             .create_s3_ingestion_job_instance(ingestion_job_context)
             .await
         {
             tracing::error!(
                 job_id = ? job_id,
                 error = ? e,
                 "Failed to create ingestion job instance on recovery. Recovery for this job \
                     skipped."
             );
+            if let Err(fail_err) = ingestion_job_manager
+                .inner
+                .clp_db_ingestion_connector
+                .try_fail(job_id, format!("Failed to recover ingestion job instance: {e}"))
+                .await
+            {
+                tracing::error!(
+                    job_id = ? job_id,
+                    error = ? fail_err,
+                    "Failed to mark ingestion job as failed after recovery failure."
+                );
+            }
+            continue;
         } else {
             num_recovered_jobs += 1;
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 398 - 410,
The recovery error branch that calls
ingestion_job_manager.create_s3_ingestion_job_instance(ingestion_job_context)
should mark the job instance as failed in the DB instead of just logging and
leaving it in requested/running; on Err(e) call the manager's failure/update
method (e.g., add or use a method like
ingestion_job_manager.mark_ingestion_job_instance_failed or
ingestion_job_manager.update_job_status_to_failed) with job_id and the error
details (or create that helper if missing), log the error with the failure
update, and ensure you do not leave the stale running/requested state
(optionally still increment num_recovered_jobs or account for it consistently
after marking failed).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 721-752: get_buffered_object_metadata currently calls fetch_all
and can OOM on large backlogs; change it to perform paginated/chunked reads
using an ordered ID window: repeatedly query
INGESTED_S3_OBJECT_METADATA_TABLE_NAME with "WHERE ingestion_job_id = ? AND
status = ? AND id > ?" plus "ORDER BY id LIMIT ?" (or equivalent), bind
self.job_id, IngestedS3ObjectMetadataStatus::Buffered, a tracked last_id, and a
fixed batch_size (e.g., 500–1000), append each batch's rows into
object_metadata_vec, update last_id to the last row's S3ObjectMetadataId, and
loop until a batch returns empty; implement this in get_buffered_object_metadata
so you avoid fetch_all and still return Vec<ObjectMetadata>.

---

Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 247-326: The job_table mutex is held across async awaits causing
lock stalls; refactor create_ingestion_job (the block using job_table,
ingestion_job_context, and ingestion_job_instance) to only lock job_table for
the validation/prefix-checks (the loop that reads existing_job_base_config and
the initial job_id collision check), then drop the lock before calling async
operations (SqsClientWrapper::create, S3ClientWrapper::create, and
ingestion_state.start), build the ingestion_job_instance off-lock, and finally
re-acquire the lock only to perform job_table.insert with an
IngestionJobTableEntry containing the prepared ingestion_job_instance and
ingestion_job_context. Ensure you preserve validation results (e.g., base_config
comparison and derived values) so re-acquiring the lock before insert can safely
assume no conflicting state was introduced.

---

Duplicate comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 398-410: The recovery error branch that calls
ingestion_job_manager.create_s3_ingestion_job_instance(ingestion_job_context)
should mark the job instance as failed in the DB instead of just logging and
leaving it in requested/running; on Err(e) call the manager's failure/update
method (e.g., add or use a method like
ingestion_job_manager.mark_ingestion_job_instance_failed or
ingestion_job_manager.update_job_status_to_failed) with job_id and the error
details (or create that helper if missing), log the error with the failure
update, and ensure you do not leave the stale running/requested state
(optionally still increment num_recovered_jobs or account for it consistently
after marking failed).

In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 361-365: The SQL string constant QUERY in the clp ingestion module
selects ingestion_job_id and compression_job_id but only groups by
compression_job_id; update the GROUP BY clause in the QUERY (the formatcp call
that builds the SELECT) to include ingestion_job_id as well (i.e. GROUP BY
`ingestion_job_id`, `compression_job_id`) so all non-aggregated selected columns
are listed, keeping the rest of the query and the
INGESTED_S3_OBJECT_METADATA_TABLE_NAME interpolation unchanged.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 06c56a7 and dffb873.

📒 Files selected for processing (2)
  • components/log-ingestor/src/ingestion_job_manager.rs
  • components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs

Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)

257-336: ⚠️ Potential issue | 🟠 Major

Do not hold the job_table mutex across awaited I/O operations.

The mutex is acquired at line 257 and held through awaited calls at lines 300, 302, 317, and 318 (AWS client creation and ingestion state startup) before release at line 336. This blocks concurrent job operations under slow AWS/database calls.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 257 - 336,
The code currently holds the job_table mutex across awaits
(SqsClientWrapper::create, S3ClientWrapper::create, ingestion_state.start) which
blocks concurrency; fix by narrowing the locked section: under the lock check
for job_id collision and prefix conflicts using
ingestion_job_context.get_ingestion_job_config()/as_base_config(), then clone
out all needed data (job_id, a clone of ingestion_job_context or its
config/base_config, and aws credentials) and drop the lock before performing any
awaited I/O (SqsClientWrapper::create, S3ClientWrapper::create,
ValidatedSqsListenerConfig::validate_and_create, ingestion_state.start, and
spawning the job instance). After creating the IngestionJob (SqsListener or
S3Scanner) without holding the mutex, re-acquire job_table.lock().await,
re-check that job_id still does not exist (to avoid race), then insert the
IngestionJobTableEntry (ingestion_job_instance, ingestion_job_context).
♻️ Duplicate comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)

408-417: ⚠️ Potential issue | 🟠 Major

Mark unrecoverable instance-creation failures explicitly in DB state.

When recovery cannot recreate an instance, the job is skipped but remains in requested/running, so it keeps reappearing as recoverable on restart without a terminal reason.

💡 Proposed fix
         if let Err(e) = ingestion_job_manager
             .create_s3_ingestion_job_instance(ingestion_job_context)
             .await
         {
             tracing::error!(
                 job_id = ? job_id,
                 error = ? e,
                 "Failed to create ingestion job instance on recovery. Recovery for this job \
                     skipped."
             );
+            if let Err(fail_err) = ingestion_job_manager
+                .inner
+                .clp_db_ingestion_connector
+                .try_fail(job_id, format!("Failed to recover ingestion job instance: {e}"))
+                .await
+            {
+                tracing::error!(
+                    job_id = ? job_id,
+                    error = ? fail_err,
+                    "Failed to mark ingestion job as failed after recovery failure."
+                );
+            }
         } else {
             num_recovered_jobs += 1;
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 408 - 417,
The recovery path currently logs errors from
ingestion_job_manager.create_s3_ingestion_job_instance(...) but leaves the job
in requested/running state so it keeps reappearing; update the DB record to mark
the job as terminal/unrecoverable when create_s3_ingestion_job_instance returns
Err(e) by calling the job-state update method (e.g., the repository/update
function used elsewhere to set terminal states) for the given job_id, set a
failure status/reason (include the error text) and persist that change before
returning from the recovery branch so the job is not retried on restart.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 257-336: The code currently holds the job_table mutex across
awaits (SqsClientWrapper::create, S3ClientWrapper::create,
ingestion_state.start) which blocks concurrency; fix by narrowing the locked
section: under the lock check for job_id collision and prefix conflicts using
ingestion_job_context.get_ingestion_job_config()/as_base_config(), then clone
out all needed data (job_id, a clone of ingestion_job_context or its
config/base_config, and aws credentials) and drop the lock before performing any
awaited I/O (SqsClientWrapper::create, S3ClientWrapper::create,
ValidatedSqsListenerConfig::validate_and_create, ingestion_state.start, and
spawning the job instance). After creating the IngestionJob (SqsListener or
S3Scanner) without holding the mutex, re-acquire job_table.lock().await,
re-check that job_id still does not exist (to avoid race), then insert the
IngestionJobTableEntry (ingestion_job_instance, ingestion_job_context).

---

Duplicate comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 408-417: The recovery path currently logs errors from
ingestion_job_manager.create_s3_ingestion_job_instance(...) but leaves the job
in requested/running state so it keeps reappearing; update the DB record to mark
the job as terminal/unrecoverable when create_s3_ingestion_job_instance returns
Err(e) by calling the job-state update method (e.g., the repository/update
function used elsewhere to set terminal states) for the given job_id, set a
failure status/reason (include the error text) and persist that change before
returning from the recovery branch so the job is not retried on restart.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dffb873 and 0963b5f.

📒 Files selected for processing (3)
  • components/log-ingestor/src/ingestion_job_manager.rs
  • components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
  • components/log-ingestor/src/routes.rs

sitaowang1998
sitaowang1998 previously approved these changes Mar 2, 2026
@LinZhihao-723 LinZhihao-723 merged commit a0f0933 into y-scope:main Mar 2, 2026
22 checks passed
@LinZhihao-723 LinZhihao-723 deleted the log-ingestor-failure-recovery branch March 2, 2026 21:31
@junhaoliao junhaoliao added this to the February 2026 milestone Mar 3, 2026
junhaoliao pushed a commit to junhaoliao/clp that referenced this pull request May 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[log-ingestor] Fault-tolerance support.

3 participants