Skip to content

[log-ingestor] Fault-tolerance support. #1978

@LinZhihao-723

Description

@LinZhihao-723

Introduction

Currently, the log-ingestor relies entirely on in-memory buffers to track ingestion jobs and the state of ingested files. This approach creates reliability issues during failure and recovery scenarios. Specifically:

  • If the log-ingestor crashes and restarts, all in-flight ingestion jobs are lost.
  • Any files ingested by a failed job that have not yet been submitted to a compression job may be permanently lost, resulting in data loss.

This feature request aims to improve the reliability of log-ingestor.

Objective

With fault tolerance implemented at the log-ingestor level, the CLP system should be able to:

  • Recover the log-ingestor from failures without losing ingestion data.
    • For S3 scanning, resume ingestion from the last successfully ingested object key.
    • For SQS-based ingestion, ensure that all relevant message metadata is durably persisted before the message is deleted from the queue.
  • Ensure that all ingested files are eventually submitted to a compression job, even after recovery from failures.
  • Provide a clear and traceable association between any failed-to-compress files and their corresponding compression jobs.
  • Enable improved monitoring and observability for all ingestion jobs across the system (possibly demonstrating in a WebUI).

Fault-tolerance model

We will partially implement the fault-tolerance model designed for CLP-Spider integration. Some modifications are made to adapt to components that are not yet available in the current system.

Image

  • Steps 1–4: Implemented as specified in the Spider design doc.
  • Steps 5–6 (Job submission): Instead of submitting compression jobs to Spider (which is not ready yet), the log-ingestor will submit compression jobs directly to the CLP DB.
  • Step 7: Implemented as specified in the Spider design doc.
  • Steps 8–9 (Job completion tracking): Rather than relying on Spider-side job state callbacks, the log-ingestor will poll the CLP DB until the associated compression job reaches a terminal state.
  • Step 10 (Post-compression updates): The log-ingestor will update only the ingested-file status. It will not update archive-specific metadata (e.g., archive IDs), since that information is not currently available from CLP core.

Storage schema

Table for ingestion jobs

Column Type NOT NULL Key Auto Inc. NOTE
id INTEGER (i64) Primary
config TEXT The ingestion job config received from user requests (in JSON form)
status ENUM("requested", "running", "paused", "failed", "finished") See below
num_files_ingested INTEGER (i64)
num_files_compressed INTEGER (i64)
error_msg BLOB Initialized to NULL
creation_ts INTEGER (i64)
last_update_ts INTEGER (i64)
…other fields for monitoring and observability

The status field captures the lifecycle of an ingestion job using the following enum values:

  • Requested: The ingestion job creation request has been accepted by the log-ingestor service, but the corresponding job coroutine has not yet been created or scheduled.
  • Running: The ingestion job is actively running and ingesting files.
  • Paused: The ingestion job has been paused in response to a user request. No new files will be ingested while in this state. In addition, an ingestion job in this state will be ignored in a recovery.
  • Failed: A non-recoverable failure happened.
    • Recoverable failures are cases that can be handled transparently by the log-ingestor, such as network glitches.
    • Non-recoverable failures are cases that the log-ingestor can’t handle at all. For example, invalid credentials.
  • Finished: The ingestion job has been permanently stopped.
    • For long-running ingestion jobs, this indicates the job has been explicitly requested to stop. Previously ingested files may still reference this job, so the record is retained instead of being immediately deleted.
    • For one-time ingestion jobs (planned but not yet supported by the current log-ingestor), this status indicates the job has completed successfully.

State transition

stateDiagram-v2
    [*] --> requested

    requested --> running : Job started by log-ingestor (after user job creation requested)

    running --> paused : Job paused (by user request)
    paused --> running : Job restarted (by user request)

    running --> failed : Non-recoverable failure

    running --> finished : Permanently stopped by user or all work completed

    failed --> [*]
    finished --> [*]
Loading

Table for S3-scanner ingestion job status

Column Type NOT NULL Key Auto Inc. NOTE
id INTEGER Foreign The ID of the ingestion job
last_ingested_key TEXT Initialized to NULL

Table for the ingested file

Files ingested into all datasets share the same table. The log-ingestor is responsible for creating this table during service initialization. This table should be named log_ingestor_ingested_files.

Column Type NOT NULL Key Auto Inc. NOTE
id INTEGER Primary The ingestion job config received from user requests (in JSON form)
path TEXT * The full path on the filesystem if the log input is FS
* bucket/key if the log input is S3-compatible OBS
status ENUM("buffered", "submitted", "compressed", "failed")
ingestion_job_id INTEGER Foreign The ID of the ingestion job
compression_job_id INTEGER Foreign The ID of the compression job
Other fields…

The status field captures the lifecycle of an ingested file using the following enum values:

  • Buffered: The file metadata has been durably ingested by the log-ingestor, but the file has not yet been submitted to a compression job.
  • Submitted: The file has been submitted to a compression job.
  • Compressed: The file has been successfully compressed.
  • Failed: The file failed to be compressed. Users may trace the failure by inspecting the associated compression_job_id to probe the error.

State transition

stateDiagram-v2
    [*] --> buffered

    buffered --> submitted : Compression job triggered (Job ID recorded)

    submitted --> compressed : Compression job completed successfully

    submitted --> failed : Compression job returned failure

    compressed --> [*]
    failed --> [*]
Loading

Recover from a failure

Whenever the log-ingestor starts, it assumes it may be recovering from a previous service failure. During startup, it scans all persisted ingestion jobs and attempts to restore them.

Recovering procedure

For each ingestion job whose status is requested or running, the log-ingestor performs the following steps:

  1. Restore ingested file state from the database: The log-ingestor iterates over all ingested-file records associated with the ingestion job and takes action based on each file’s status:
    • Buffered: The file has been durably ingested but not yet submitted for compression. The log-ingestor re-enqueues the file into the in-memory buffer for submission.
    • Submitted: The file has already been submitted to a compression job. The log-ingestor creates a coroutine to wait for the associated compression job to reach a terminal state and update the status accordingly.
    • Compressed / Failed: The file has already reached a terminal state. No recovery action is required, and these records may be skipped during database iteration.
  2. Restart the ingestion job: After restoring all relevant file-level state, the log-ingestor restarts the ingestion job coroutine and resumes normal ingestion behavior.

Buffer model assumption

In the current design, there is a one-to-one mapping between an ingestion job and its in-memory log buffer. In the future, we may introduce shared buffers to allow multiple ingestion jobs to enqueue files into the same buffer. This enhancement does not conflict with the current design and can be integrated incrementally without altering the recovery semantics described above.

Implementation plan

The fault-tolerance model will be implemented incrementally with the following steps:

  1. Add support for creating the required ingestion job–related tables during CLP DB initialization.
  2. Define a trait that encapsulates all persistence-related operations (e.g., job state updates, file state updates).
    • This abstraction allows component-level integration tests to run without requiring a live CLP DB instance.
  3. Add support for storing ingestion job configuration and status in the CLP DB, along with the associated job lifecycle management logic.
  4. Add support for persisting per-file ingestion metadata in the CLP DB and managing file-level state transitions.
  5. Add recovery logic to restore in-flight ingestion jobs and ingested files from the persisted state at log-ingestor startup.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request
No fields configured for Feature.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions