Skip to content

feat(package): Add support for gracefully shutting down the compression scheduler and compression workers (resolves #1037).#1169

Merged
AVMatthews merged 12 commits into
y-scope:mainfrom
AVMatthews:graceful-shutdown
Sep 23, 2025
Merged

Conversation

@AVMatthews

@AVMatthews AVMatthews commented Aug 6, 2025

Copy link
Copy Markdown
Contributor

Description

Resolves Issue #1037

  • Enables the warm shutdown feature on celery workers
  • Add signal handlers to compression scheduler and compression worker. The compression scheduler signal handler will stop the scheduler from scheduling new jobs and will instead just poll existing jobs.
  • Extends docker shutdown timeouts for current running compression tasks to complete gracefully.
    -> 5 min for scheduler and 1 min for workers
  • Reorder shutdowns so that the compression scheduler is shut down before the compression workers.
    -> The docker stop commands will wait the full timeout time if the processes on the docker take advantage of it and no further docker stop commands will be given. So with the new ordering the scheduler is stopped first with 5 mins of waiting to hear from existing jobs. The workers are stopped after with 60 additional seconds to wrap up it's current job. If you have an especially long job, this can still lead it issues. Issue feat(package): Handle tasks in RUNNING state on compression scheduler restart. #1045 speaks to how we may be able to address this in the future.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Testing starting and stopping the package both with jobs and without.
  • Saw that the scheduler stopped assigning new jobs, but continues to poll for already assigned jobs.
  • Double checked the the successful jobs were still readable and searchable using the webui.

Summary by CodeRabbit

  • New Features

    • Scheduler stops scheduling new tasks after a termination signal, waits for running jobs to finish, and exits when idle.
    • Worker shutdown events are now logged for operational visibility.
  • Enhancements

    • Container stop operations support configurable timeouts.
    • Stop ordering and timeout values adjusted to provide longer graceful-stop windows for scheduler and worker services.

@AVMatthews AVMatthews requested a review from gibber9809 August 6, 2025 20:26
@AVMatthews AVMatthews requested a review from a team as a code owner August 6, 2025 20:26
@coderabbitai

coderabbitai Bot commented Aug 6, 2025

Copy link
Copy Markdown
Contributor

Walkthrough

Adds a timeout parameter to container stop calls and reorders targeted stops in the stop script; registers a Celery worker_shutdown handler that logs on shutdown; and implements SIGTERM handling in the compression scheduler to stop scheduling new tasks and exit when no jobs remain.

Changes

Cohort / File(s) Change Summary
Stop script timeout and sequence updates
components/clp-package-utils/clp_package_utils/scripts/stop_clp.py
stop_running_container(...) signature updated to include timeout: int = 10; docker stop invoked with --timeout <timeout>; COMPRESSION_SCHEDULER stop uses timeout=300; COMPRESSION_WORKER stop moved to targeted path and uses timeout=60; calls wired to pass timeout where applicable.
Celery worker shutdown signal handler
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
Added @signals.worker_shutdown.connect handler worker_shutdown_handler(signal=None, sender=None, **kwargs) which logs a shutdown message when a Celery worker shutdown event occurs.
SIGTERM handling in compression scheduler
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Added module-level received_sigterm flag and sigterm_handler(signal_number, frame) that sets it and logs; registered signal.signal(signal.SIGTERM, sigterm_handler); main loop skips search_and_schedule_new_tasks() when flag is set; poll_running_jobs() exits early via sys.exit(0) if flag set and no scheduled jobs remain; KeyboardInterrupt message changed to "Forcefully shutting down".

Sequence Diagram(s)

Compression scheduler SIGTERM handling

sequenceDiagram
    participant OS
    participant Scheduler
    participant JobsDB

    OS->>Scheduler: SIGTERM
    Scheduler->>Scheduler: sigterm_handler sets received_sigterm = true
    loop Scheduler main loop
        Scheduler->>Scheduler: check received_sigterm
        alt received_sigterm == false
            Scheduler->>JobsDB: search_and_schedule_new_tasks()
        else received_sigterm == true
            Scheduler-->>Scheduler: skip scheduling new tasks
        end
        Scheduler->>JobsDB: poll_running_jobs()
        alt received_sigterm == true and no scheduled jobs
            Scheduler->>Scheduler: sys.exit(0)
        end
        Scheduler->>Scheduler: sleep()
    end
Loading

Stop script docker stop timeouts and order

sequenceDiagram
    participant StopScript
    participant Docker

    StopScript->>Docker: docker stop --time 300 <compression_scheduler_container>
    StopScript->>Docker: docker stop --time 60 <compression_worker_container>
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related issues

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 28.57% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title concisely and accurately summarizes the main change: adding graceful shutdown for the compression scheduler and compression workers, and it aligns with the PR changes (SIGTERM/Celery shutdown handling and Docker stop timeout/order adjustments) and the referenced issue. It is specific, readable, and useful for a teammate scanning history.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Ruff (0.13.1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py

�[1;31mruff failed�[0m
�[1mCause:�[0m Failed to load configuration /ruff.toml
�[1mCause:�[0m Failed to parse /ruff.toml
�[1mCause:�[0m TOML parse error at line 26, column 3
|
26 | "RSE100", # Use of assert detected
| ^^^^^^^^
Unknown rule selector: RSE100

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

�[1;31mruff failed�[0m
�[1mCause:�[0m Failed to load configuration /ruff.toml
�[1mCause:�[0m Failed to parse /ruff.toml
�[1mCause:�[0m TOML parse error at line 26, column 3
|
26 | "RSE100", # Use of assert detected
| ^^^^^^^^
Unknown rule selector: RSE100


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c72168a and 4336616.

📒 Files selected for processing (3)
  • components/clp-package-utils/clp_package_utils/scripts/stop_clp.py (2 hunks)
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: Bill-hbrhbr
PR: y-scope/clp#1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.
📚 Learning: in the clp-package compression flow, path validation and error handling is performed at the schedule...
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: in the compression scheduler, the team prefers initializing in-memory caches from the database at st...
Learnt from: Bill-hbrhbr
PR: y-scope/clp#831
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:0-0
Timestamp: 2025-04-17T16:55:06.658Z
Learning: In the compression scheduler, the team prefers initializing in-memory caches from the database at startup rather than performing repeated database queries for efficiency reasons. This approach maintains both performance and reliability across process restarts.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: lint-check (macos-15)
🔇 Additional comments (8)
components/clp-package-utils/clp_package_utils/scripts/stop_clp.py (3)

36-38: LGTM! Clean implementation of configurable timeouts.

The function signature enhancement maintains backward compatibility with the default 10-second timeout while enabling specific timeout configuration for different container types.


41-41: Correct Docker stop command syntax.

The timeout parameter is properly passed to the Docker stop command using the -t flag.


161-161: Appropriate timeout values for graceful shutdown.

The 300-second timeout for the compression scheduler and 60-second timeout for the compression worker align well with the graceful shutdown objectives, allowing sufficient time for active compression jobs to complete.

Also applies to: 166-168

components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2)

10-10: Appropriate import for Celery signal handling.

The signals import is correctly added to support the worker shutdown handler.


485-487: Well-implemented Celery shutdown signal handler.

The signal handler follows Celery's patterns correctly and provides useful logging visibility for worker shutdown events. The warning log level is appropriate for this type of lifecycle event.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (3)

5-5: Appropriate import for signal handling.

The signal import is correctly added to support SIGTERM handling functionality.


56-59: Effective SIGTERM signal handler implementation.

The signal handler correctly sets the global flag and provides appropriate logging when SIGTERM is received. This enables the graceful shutdown mechanism.


63-63: Proper signal registration.

The SIGTERM signal is correctly registered with the handler function.

@gibber9809 gibber9809 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! Main thing is that we also need to have the compression scheduler exit after it has received SIGTERM and all running jobs have finished. Besides that I just had a few small style comments.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)

397-400: Fix recieved typos, prefer truthiness for scheduled_jobs, and consider SIGTERM exit code

  • Found matches — fix "recieved" → "received" in:

    • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py: line 398
    • components/webui/client/src/api/socket/useCursor.tsx: ~line 58 (comment)
    • components/webui/client/src/pages/SearchPage/SearchState/typings.ts: ~line 16 (comment)
  • For compression_scheduler.py prefer truthiness and sharpen the log message; apply suggested diff:

-    if received_sigterm and 0 == len(scheduled_jobs):
-        logger.info("Recieved sigterm, No more running jobs. Exiting.")
-        sys.exit(0)
+    if received_sigterm and not scheduled_jobs:
+        logger.info("Received SIGTERM; no running jobs remain. Exiting.")
+        sys.exit(0)  # or: sys.exit(signal.SIGTERM + 128)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4336616 and 94e02bb.

📒 Files selected for processing (2)
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)
components/job-orchestration/job_orchestration/executor/query/utils.py (1)
  • sigterm_handler (70-80)
🔇 Additional comments (5)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)

10-10: LGTM: Celery signal import added.

Importing Celery signals here is appropriate and low risk.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (4)

5-5: LGTM: imports updated for signal handling.


416-418: Good: SIGTERM handler registered after logger setup.

Registration order matches the logging usage in the handler.


449-456: LGTM: gating new scheduling after SIGTERM.

This aligns with the graceful‑shutdown objective while allowing polling to continue.


460-460: Shutdown messaging is clearer; consider consistent severity across components.

If the worker uses INFO for shutdown too, keep both at INFO; otherwise align to WARN in both.

Please confirm desired consistency and I can send a small patch across scheduler/worker logs.

Comment on lines +53 to +60
received_sigterm = False


def sigterm_handler(sig, frame):
global received_sigterm
received_sigterm = True
logger.info("Received SIGTERM in compression scheduler")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick

Signal flag + handler: tighten style and lint.

Use throwaway parameter names and keep naming consistent.

-def sigterm_handler(sig, frame):
-    global received_sigterm
-    received_sigterm = True
-    logger.info("Received SIGTERM in compression scheduler")
+def sigterm_handler(_sig, _frame):
+    global received_sigterm
+    received_sigterm = True
+    logger.info("Received SIGTERM in compression scheduler.")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
received_sigterm = False
def sigterm_handler(sig, frame):
global received_sigterm
received_sigterm = True
logger.info("Received SIGTERM in compression scheduler")
received_sigterm = False
def sigterm_handler(_sig, _frame):
global received_sigterm
received_sigterm = True
logger.info("Received SIGTERM in compression scheduler.")
🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
around lines 53 to 60, the signal handler uses positional parameter names `sig`
and `frame` and the flag name could be clearer; rename the handler parameters to
throwaway names (e.g., `_signum` and `_frame`) to indicate they are unused, and
ensure consistent naming in the handler (keep `received_sigterm` as the global
flag), update the handler signature and any references accordingly, and keep the
logger message style consistent (e.g., "Received SIGTERM in compression
scheduler").

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (4)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)

527-530: Warm shutdown currently only logs; ensure the clp subprocess is terminated to prevent stragglers.

If the worker process receives SIGTERM and exits without signalling the spawned clp process, the child may continue running, risking orphaned/half‑written archives. Forward SIGTERM to the clp subprocess group and start it in its own session. Also avoid shadowing by renaming the handler arg.

Apply within this hunk:

-@signals.worker_shutdown.connect
-def worker_shutdown_handler(signal=None, sender=None, **kwargs):
-    logger.info("Worker shutdown signal received.")
+@signals.worker_shutdown.connect
+def worker_shutdown_handler(_signal=None, sender=None, **kwargs):
+    logger.info("Worker shutdown signal received.")

Then, in run_clp right after creating the process, install a SIGTERM forwarder (illustrative, outside this hunk):

# Start compression
proc = subprocess.Popen(
    compression_cmd,
    stdout=subprocess.PIPE,
    stderr=stderr_log_file,
    env=compression_env,
    start_new_session=True,  # so we can signal the whole group
)

def _on_sigterm(_sig, _frame):
    try:
        os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
    except Exception:
        logger.exception("Failed forwarding SIGTERM to clp subprocess group")

prev = signal.signal(signal.SIGTERM, _on_sigterm)
try:
    # ... existing read/loop/wait logic ...
    return_code = proc.wait()
finally:
    # restore to reduce side‑effects on subsequent tasks in the same worker process
    signal.signal(signal.SIGTERM, prev)

Optionally, also hook celery.signals.task_revoked to forward termination when a task is revoked.

Verify presence of the two key pieces:

#!/bin/bash
rg -nP 'Popen\([^)]*start_new_session\s*=\s*True' components/job-orchestration/job_orchestration/executor/compress/compression_task.py -C3
rg -nP 'signal\.signal\(\s*signal\.SIGTERM' components/job-orchestration/job_orchestration/executor/compress/compression_task.py -C3
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (3)

62-66: Tighten handler style and message punctuation.

Use throwaway arg names to signal unused parameters and end log messages with a period for consistency.

-def sigterm_handler(sig, frame):
+def sigterm_handler(_sig, _frame):
     global received_sigterm
     received_sigterm = True
-    logger.info("Received SIGTERM in compression scheduler")
+    logger.info("Received SIGTERM in compression scheduler.")

473-473: Message change is acceptable.

“Forcefully shutting down” on Ctrl‑C is accurate; consider ending with a period for consistency.

-                logger.info("Forcefully shutting down")
+                logger.info("Forcefully shutting down.")

407-410: Typo and clarity in exit log; keep spelling and casing consistent.

Fix “Recieved” → “Received” and consider slightly clearer wording.

-    if received_sigterm and 0 == len(scheduled_jobs):
-        logger.info("Recieved sigterm, No more running jobs. Exiting.")
+    if received_sigterm and len(scheduled_jobs) == 0:
+        logger.info("Received SIGTERM; no running jobs remain. Exiting.")
         sys.exit(0)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 94e02bb and a89769e.

📒 Files selected for processing (3)
  • components/clp-package-utils/clp_package_utils/scripts/stop_clp.py (2 hunks)
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (5 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: junhaoliao
PR: y-scope/clp#1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.
🧬 Code graph analysis (2)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)
components/job-orchestration/job_orchestration/executor/query/utils.py (1)
  • sigterm_handler (70-80)
components/clp-package-utils/clp_package_utils/scripts/stop_clp.py (1)
components/clp-package-utils/clp_package_utils/general.py (1)
  • is_container_running (155-168)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: package-image
  • GitHub Check: lint-check (macos-15)
🔇 Additional comments (5)
components/clp-package-utils/clp_package_utils/scripts/stop_clp.py (1)

37-39: Good: plumb a timeout parameter into stop helper.

This enables caller‑specific grace periods per component. No issues spotted.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (4)

5-5: Importing signal for SIGTERM handler is appropriate.

No issues.


59-60: Clear, module‑level termination flag.

Simple and effective.


426-428: Registering SIGTERM after logger setup: good placement.

Consistent with using the logger inside the handler.

Given prior learnings about logging level env selection in start scripts, please confirm CLP_LOGGING_LEVEL for the compression scheduler is sourced from the compression scheduler config (not the query scheduler). If not, we should fix it in the start script.


463-469: Correct: stop scheduling new work after SIGTERM, continue polling.

This achieves graceful behaviour. Ensure jobs_poll_delay isn’t so large that shutdown feels sluggish.

You can confirm effective loop cadence at runtime by checking timestamps around the final “Poll running jobs” entries.

if is_container_running(container_name):
logger.info(f"Stopping {container_name}...")
cmd = ["docker", "stop", container_name]
cmd = ["docker", "stop", "-t", str(timeout), container_name]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick

Use docker’s grace period; consider making subprocess invocation itself bounded.

docker stop -t <timeout> is correct. For extra resilience against a hung Docker CLI/daemon, consider bounding subprocess.run with a slightly higher wall‑clock timeout (e.g., timeout=timeout+30) and logging a useful error if exceeded.

-        subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)
+        subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True, timeout=timeout + 30)

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In components/clp-package-utils/clp_package_utils/scripts/stop_clp.py around
line 42, the code builds the docker stop command with the correct grace period
but does not bound the subprocess invocation; update the subprocess.run call to
include a wall-clock timeout (e.g., timeout=timeout+30) and wrap the call in
try/except to catch subprocess.TimeoutExpired, logging a clear error when the
CLI call itself hangs and optionally handling cleanup (e.g., attempt docker kill
or record failure) — keep the existing docker stop -t usage and only add the
timeout parameter plus a TimeoutExpired handler that logs useful diagnostic
details.

Comment on lines 165 to 167
container_name = f"clp-{COMPRESSION_SCHEDULER_COMPONENT_NAME}-{instance_id}"
stop_running_container(container_name, already_exited_containers, force)
stop_running_container(container_name, already_exited_containers, force, timeout=300)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick

Scheduler first with longer grace window: LGTM; prefer config‑driven values.

Stopping the compression scheduler before workers with a 300s grace aligns with the graceful shutdown plan. Consider sourcing 300 from config to avoid hard‑coding policy in code.

🤖 Prompt for AI Agents
In components/clp-package-utils/clp_package_utils/scripts/stop_clp.py around
lines 165 to 167, the 300s hard-coded timeout passed to stop_running_container
should be made configurable; replace the literal with a named configuration
variable read from your existing config (or environment/CLI) such as
COMPRESSION_SCHEDULER_GRACE_SECONDS, provide a sensible default of 300,
validate/coerce it to an int, and pass that variable into
stop_running_container; also add the new setting to the appropriate config file
or argparse options and a short comment documenting its purpose.

Comment on lines +171 to 174
if target in (ALL_TARGET_NAME, COMPRESSION_WORKER_COMPONENT_NAME):
container_name = f"clp-{COMPRESSION_WORKER_COMPONENT_NAME}-{instance_id}"
stop_running_container(container_name, already_exited_containers, force, timeout=60)
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, REDIS_COMPONENT_NAME):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick

Workers after scheduler with shorter window: LGTM; consider config.

Ordering and the 60s grace look reasonable. As above, prefer config/env to tune per deployment.

🤖 Prompt for AI Agents
components/clp-package-utils/clp_package_utils/scripts/stop_clp.py around lines
171 to 174: the 60s timeout passed to stop_running_container is hard-coded; make
it configurable via environment or a config value with a sensible default. Add
or reuse a config lookup (e.g., read from an env var like
CLP_STOP_TIMEOUT_SECONDS or from the existing config module), parse it to an int
with fallback to 60, and replace the literal 60 in the stop_running_container
call with that variable so deployments can tune the grace period.

@gibber9809 gibber9809 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor typo, but besides that LGTM.

For the PR title I think we just need to add a period:

feat(package): Add support for gracefully shutting down the compression scheduler and compression workers (resolves #1037).

After fixing the typo, I'll approve.

Co-authored-by: Devin Gibson <gibber9809@users.noreply.github.com>
@AVMatthews AVMatthews changed the title feat(package): Add support for gracefully shutting down the compression scheduler and compression workers (resolves #1037) feat(package): Add support for gracefully shutting down the compression scheduler and compression workers (resolves #1037). Sep 19, 2025

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)

418-422: Harden log file setup against missing CLP_LOGS_DIR and absent directory.
Avoid Path(None) and ensure directory exists to prevent startup failure.

-    log_file = Path(os.getenv("CLP_LOGS_DIR")) / "compression_scheduler.log"
+    log_dir = os.getenv("CLP_LOGS_DIR") or "."
+    log_file = Path(log_dir) / "compression_scheduler.log"
+    log_file.parent.mkdir(parents=True, exist_ok=True)

197-205: Break scheduling loop immediately when SIGTERM is received

Add a per‑job check of received_sigterm at the start of the loop and break so remaining PENDING jobs are not scheduled.

File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py Lines: 197-205

     for job_row in jobs:
+        if received_sigterm:
+            logger.info("SIGTERM received; skipping scheduling of remaining PENDING jobs.")
+            break
         job_id = job_row["id"]
♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)

59-66: Tighten signal‑handler style; add type hint.
Use throwaway arg names and punctuate log. Minor polish, no behaviour change.

-received_sigterm = False
+received_sigterm: bool = False
@@
-def sigterm_handler(sig, frame):
+def sigterm_handler(_sig, _frame):
     global received_sigterm
     received_sigterm = True
-    logger.info("Received SIGTERM in compression scheduler")
+    logger.info("Received SIGTERM in compression scheduler.")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e59d0bc and 26bdef8.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)
components/job-orchestration/job_orchestration/executor/query/utils.py (1)
  • sigterm_handler (70-80)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: package-image
  • GitHub Check: antlr-code-committed (macos-15)
🔇 Additional comments (3)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (3)

5-5: LGTM: import for signal handling is appropriate.


407-410: Confirm intended exit code for SIGTERM‑driven shutdown.
Do we want POSIX‑style 128+signal (143) or 0 to indicate graceful success? Align with repo conventions.

Option if aligning to POSIX:

-        sys.exit(0)
+        sys.exit(signal.SIGTERM + 128)

426-428: LGTM: handler registered after logger setup.
Order is correct; the handler can safely log. Do we also want to handle SIGHUP for container restarts?

Comment on lines +463 to +469
if not received_sigterm:
search_and_schedule_new_tasks(
clp_config,
db_conn,
db_cursor,
clp_metadata_db_connection_config,
)

@coderabbitai coderabbitai Bot Sep 19, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Race: SIGTERM during scheduling can still enqueue new tasks.
Gate is only before the call; if SIGTERM arrives mid‑execution of search_and_schedule_new_tasks, the current loop may continue scheduling remaining jobs. Break out inside that loop to stop immediately.

🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
around lines 463-469, the current SIGTERM guard is only checked before calling
search_and_schedule_new_tasks so a SIGTERM that arrives while that function is
running can still enqueue more tasks; modify search_and_schedule_new_tasks to
accept an interrupt checker (e.g., pass received_sigterm or a callable like
should_stop()) and have its inner scheduling loop check that flag/callable after
each task (or at sensible interruption points) and return early if true so no
further tasks are enqueued after SIGTERM.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell this race is benign. If we schedule a few more jobs before shutdown it shouldn't be a problem.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gibber9809 That's a fair assessment. You're right that scheduling a few additional jobs during the shutdown window is benign - they'll either complete normally or be handled by the hanging job cleanup on the next startup. The current coarse-grained approach strikes a good balance between shutdown responsiveness and implementation simplicity.


✏️ Learnings added
Learnt from: gibber9809
PR: y-scope/clp#1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.737Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.

Learnt from: haiqi96
PR: y-scope/clp#569
File: components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py:380-392
Timestamp: 2024-11-15T20:07:22.256Z
Learning: The current implementation assumes single-threaded execution, so race conditions in functions like `is_target_extracted` in `components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py` are not currently a concern.

@gibber9809 gibber9809 self-requested a review September 19, 2025 18:28
gibber9809
gibber9809 previously approved these changes Sep 19, 2025

@gibber9809 gibber9809 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. PR title is fine as well. As long as we double checked that graceful shutdown still works + terminates before the 5 minute timeout when there are no more scheduled jobs after the most recent changes then this should be good to merge.

Comment thread components/clp-package-utils/clp_package_utils/scripts/stop_clp.py Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 26bdef8 and 817b828.

📒 Files selected for processing (3)
  • components/clp-package-utils/clp_package_utils/scripts/stop_clp.py (2 hunks)
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (5 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: gibber9809
PR: y-scope/clp#1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.747Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.
📚 Learning: 2025-09-19T18:28:26.747Z
Learnt from: gibber9809
PR: y-scope/clp#1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.747Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py
  • components/clp-package-utils/clp_package_utils/scripts/stop_clp.py
📚 Learning: 2024-11-15T20:07:22.256Z
Learnt from: haiqi96
PR: y-scope/clp#569
File: components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py:380-392
Timestamp: 2024-11-15T20:07:22.256Z
Learning: The current implementation assumes single-threaded execution, so race conditions in functions like `is_target_extracted` in `components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py` are not currently a concern.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
PR: y-scope/clp#1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
  • components/clp-package-utils/clp_package_utils/scripts/stop_clp.py
📚 Learning: 2025-07-23T09:54:45.185Z
Learnt from: Bill-hbrhbr
PR: y-scope/clp#1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.

Applied to files:

  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.

Applied to files:

  • components/clp-package-utils/clp_package_utils/scripts/stop_clp.py
🧬 Code graph analysis (2)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)
components/job-orchestration/job_orchestration/executor/query/utils.py (1)
  • sigterm_handler (70-80)
components/clp-package-utils/clp_package_utils/scripts/stop_clp.py (1)
components/clp-package-utils/clp_package_utils/general.py (1)
  • is_container_running (155-168)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: package-image
  • GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (12)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2)

9-9: Import added for signal handling.

The import of celery.signals aligns with the new worker shutdown signal handler.


526-529: Consistent signal handling approach with shutdown scheduler.

The worker shutdown handler provides logging for graceful shutdown events, aligning with the broader shutdown handling implemented across the system. The approach is straightforward and fits the project's pattern established in the compression scheduler.

Per the past review comment suggesting simplification to "Shutdown signal received.", this implementation correctly follows maintainer feedback. The worker_shutdown signal hook complements the scheduler's SIGTERM handling and the enhanced Docker timeouts to provide coordinated graceful shutdown across the compression subsystem.

components/clp-package-utils/clp_package_utils/scripts/stop_clp.py (4)

37-39: Enhanced function signature for graceful shutdown.

The addition of the timeout parameter with a sensible default aligns with the PR's objective of implementing configurable grace periods for container stops.


42-42: Docker timeout implementation follows best practices.

Using --timeout with str(timeout) correctly implements Docker's grace period mechanism. The approach allows workers to finish currently executing tasks before termination, with the worker allowing worker_soft_shutdown_timeout seconds for tasks to finish before initiating cold shutdown.


171-173: Worker timeout aligns with shutdown coordination.

The 60-second timeout for compression workers balances allowing task completion while preventing excessively long shutdown times. The reordering ensures the scheduler completes its shutdown logic before workers are terminated.

Similar to the scheduler timeout, consider making the 60-second worker timeout configurable as suggested in past review comments.


165-166: Consider making scheduler timeout configurable.

The 300-second timeout for compression scheduler is appropriate for longer-running compression tasks. However, consider making this configurable via environment variables or config file to accommodate different deployment needs.

Per the past review comment suggesting config-driven values, consider adding this to a configuration parameter that can be tuned per deployment while keeping 300 as the default.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (6)

5-5: Signal module import for SIGTERM handling.

The import is necessary for the new signal handling functionality.


59-59: Global flag for shutdown coordination.

The received_sigterm flag provides the mechanism for coordinating graceful shutdown across the scheduler's main loop.


62-65: SIGTERM handler implementation.

The handler correctly sets the global flag and logs the signal reception. The parameter naming follows Python conventions for unused parameters.


426-427: Signal handler registration placement.

Registering the SIGTERM handler after logging setup ensures the logger is available for use within the handler. Good placement following established patterns.


463-469: Graceful shutdown logic prevents new task scheduling.

The conditional check correctly prevents new task scheduling after receiving SIGTERM while allowing the scheduler to continue polling existing jobs. This implementation aligns with the learned approach that considers the race condition during search_and_schedule_new_tasks execution to be benign.


472-473: Updated shutdown messaging.

The change from "Gracefully shutting down" to "Forcefully shutting down" for KeyboardInterrupt is more accurate, as CTRL+C represents forceful termination rather than graceful shutdown.

Comment on lines +407 to +409
if received_sigterm and 0 == len(scheduled_jobs):
logger.info("Recieved SIGTERM and there're no more running jobs. Exiting.")
sys.exit(0)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick

Fix typo in log message.

Line 408 contains a typo: "Recieved" should be "Received".

Apply this diff to fix the typo:

-        logger.info("Recieved SIGTERM and there're no more running jobs. Exiting.")
+        logger.info("Received SIGTERM and there're no more running jobs. Exiting.")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if received_sigterm and 0 == len(scheduled_jobs):
logger.info("Recieved SIGTERM and there're no more running jobs. Exiting.")
sys.exit(0)
if received_sigterm and 0 == len(scheduled_jobs):
logger.info("Received SIGTERM and there're no more running jobs. Exiting.")
sys.exit(0)
🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
around lines 407 to 409, fix the typo in the log message: change "Recieved" to
"Received" so the logger.info call reads "Received SIGTERM and there're no more
running jobs. Exiting." ensuring correct spelling only.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 817b828 and f80d44d.

📒 Files selected for processing (2)
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (5 hunks)
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: gibber9809
PR: y-scope/clp#1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.747Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.
📚 Learning: 2025-09-19T18:28:26.747Z
Learnt from: gibber9809
PR: y-scope/clp#1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.747Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py
📚 Learning: 2024-11-15T20:07:22.256Z
Learnt from: haiqi96
PR: y-scope/clp#569
File: components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py:380-392
Timestamp: 2024-11-15T20:07:22.256Z
Learning: The current implementation assumes single-threaded execution, so race conditions in functions like `is_target_extracted` in `components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py` are not currently a concern.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
PR: y-scope/clp#1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.

Applied to files:

  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-07-23T09:54:45.185Z
Learnt from: Bill-hbrhbr
PR: y-scope/clp#1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.

Applied to files:

  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py
🧬 Code graph analysis (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)
components/job-orchestration/job_orchestration/executor/query/utils.py (1)
  • sigterm_handler (70-80)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: package-image
🔇 Additional comments (6)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (5)

59-59: Good: explicit SIGTERM flag name.

Clear, self‑documenting, and specific to the signal handled.


5-5: SIGTERM import and registration order — LGTM.

Registering after logger setup ensures the handler can safely log.

Also applies to: 482-484


519-525: Gate new scheduling after SIGTERM — LGTM.

Matches the intended graceful shutdown behaviour; the coarse‑grained check is acceptable per maintainer guidance.

Please confirm no further interruption points are needed inside search_and_schedule_new_tasks, given the team’s stance that mid‑call scheduling during shutdown is benign.


463-466: Fix typo and wording in exit log.

Spelling and grammar.

-    if received_sigterm and 0 == len(scheduled_jobs):
-        logger.info("Recieved SIGTERM and there're no more running jobs. Exiting.")
+    if received_sigterm and len(scheduled_jobs) == 0:
+        logger.info("Received SIGTERM and there are no more running jobs. Exiting.")
         sys.exit(0)

62-66: Use throwaway parameter names in the signal handler.

Avoids unused‐variable lint and clarifies intent.

-def sigterm_handler(signal_number, frame):
+def sigterm_handler(_signum, _frame):
     global received_sigterm
     received_sigterm = True
     logger.info("Received SIGTERM.")
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)

9-9: Import of celery.signals is appropriate

Used below for worker shutdown hook. No concerns.

Comment on lines +527 to +530
@signals.worker_shutdown.connect
def worker_shutdown_handler(signal=None, sender=None, **kwargs):
logger.info("Shutdown signal received.")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Forward SIGTERM to clp/indexer subprocess groups; logging-only risks orphaned/partial archives

If the worker is terminated (e.g., Docker timeout escalation), the Python process may die while the clp (and later indexer) subprocess continues or is left orphaned. Forward SIGTERM to the child process group and start the subprocesses in their own session to ensure clean termination.

Apply this small change to the handler (adds intent and calls a helper):

 @signals.worker_shutdown.connect
-def worker_shutdown_handler(signal=None, sender=None, **kwargs):
-    logger.info("Shutdown signal received.")
+def worker_shutdown_handler(signal=None, sender=None, **kwargs):
+    logger.info("Shutdown signal received; forwarding SIGTERM to clp subprocess group (if running).")
+    try:
+        _forward_sigterm_to_current_child()
+    except Exception:
+        logger.exception("Failed forwarding SIGTERM to clp subprocess group")

Then, implement the forwarding and register OS-level SIGTERM in run_clp (outside the changed hunk):

# Add near imports
import signal

# Add near module top
CURRENT_CHILD_PGID: Optional[int] = None

def _set_current_child_pgid(pgid: Optional[int]) -> None:
    global CURRENT_CHILD_PGID
    CURRENT_CHILD_PGID = pgid

def _forward_sigterm_to_current_child() -> None:
    if CURRENT_CHILD_PGID is not None:
        os.killpg(CURRENT_CHILD_PGID, signal.SIGTERM)

# In run_clp(), replace the Popen call and register a SIGTERM handler:
proc = subprocess.Popen(
    compression_cmd,
    stdout=subprocess.PIPE,
    stderr=stderr_log_file,
    env=compression_env,
    start_new_session=True,  # ensure child has its own process group
)
_set_current_child_pgid(os.getpgid(proc.pid))

old_sig_handler = signal.signal(signal.SIGTERM, lambda *_: _forward_sigterm_to_current_child())
try:
    # ... existing loop and logic ...
    return_code = proc.wait()
    # ... existing logic ...
finally:
    try:
        signal.signal(signal.SIGTERM, old_sig_handler)
    except Exception:
        pass
    _set_current_child_pgid(None)
    try:
        stderr_log_file.close()
    except Exception:
        logger.exception("Failed to close stderr log file")

Optional: also forward on task revocation (helps when tasks are revoked rather than worker shutdown):

@signals.task_revoked.connect
def _on_task_revoked(request=None, terminated=None, signum=None, **kwargs):
    logger.warning("Task revoked; forwarding SIGTERM to clp subprocess group (if running).")
    _forward_sigterm_to_current_child()

Verification: after applying, run a compression task, send SIGTERM to the worker process, and confirm that both the Python worker process and the clp subprocess receive SIGTERM (ps output should show both terminated and no lingering clp processes).


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Print the file and surrounding lines to inspect existing shutdown handling and subprocess start
FILE="components/job-orchestration/job_orchestration/executor/compress/compression_task.py"
echo "==== Showing file header and imports ===="
sed -n '1,160p' "$FILE" || true
echo "==== Showing lines 480-620 (area of interest) ===="
sed -n '480,620p' "$FILE" || true

echo "==== Grep for relevant symbols ===="
rg -n --hidden --no-ignore -S "worker_shutdown_handler|task_revoked|run_clp|Popen|start_new_session|signal\.signal|os\.killpg|getpgid|getpgid|setpgid|CURRENT_CHILD_PGID|subprocess\.Popen" || true

Length of output: 10544


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="components/job-orchestration/job_orchestration/executor/compress/compression_task.py"
echo "==== Show def run_clp and surrounding lines ===="
rg -n --hidden --no-ignore -n "def run_clp" "$FILE" || true
sed -n '300,440p' "$FILE" || true

echo "==== Search for process-group / signal patterns in this file ===="
rg -n --hidden --no-ignore -S "start_new_session|preexec_fn|setsid|setpgid|getpgid|killpg|signal\.signal|signal\.SIGTERM|CURRENT_CHILD_PGID|os\.killpg" "$FILE" || true

echo "==== Search repo-wide for similar patterns (to reference existing implementations) ===="
rg -n --hidden --no-ignore -S "start_new_session|preexec_fn|setsid|setpgid|getpgid|killpg|signal\.signal|signal\.SIGTERM|os\.killpg" || true

Length of output: 5993


Forward SIGTERM to clp subprocess group on worker shutdown

worker_shutdown_handler only logs; run_clp launches clp with subprocess.Popen without creating a separate process group, so the clp process can be orphaned when the worker dies (risking partial/orphaned archives). Change required:

  • In components/job-orchestration/job_orchestration/executor/compress/compression_task.py:
    • run_clp: start the child in its own group (start_new_session=True or preexec_fn=os.setpgrp) when calling Popen (proc at ~line 403).
    • record the child PGID (os.getpgid(proc.pid)) and clear it in finally.
    • worker_shutdown_handler (currently at ~lines 528–530): forward SIGTERM to that PGID (os.killpg(pgid, signal.SIGTERM)) instead of logging only.
  • Also forward on task revocation (signals.task_revoked) to cover revoked tasks.
  • Reference existing pattern in executor/query/utils.py (uses preexec_fn=os.setpgrp and os.killpg) and mirror its cleanup/handler restore behaviour.
🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
around lines ~403 and ~528–530, run_clp currently launches the clp subprocess
without isolating its process group and worker_shutdown_handler only logs the
shutdown; update run_clp to start the child in its own group (use Popen(...,
start_new_session=True) or Popen(..., preexec_fn=os.setpgrp)), capture and store
the child's PGID via os.getpgid(proc.pid), and ensure you clear that stored PGID
in the finally block after the process ends; change worker_shutdown_handler to
forward SIGTERM to the stored PGID using os.killpg(pgid, signal.SIGTERM) (and
handle the case where PGID is None or process gone), and add the same forwarding
behavior for signals.task_revoked; mirror the cleanup and handler-restore
pattern used in executor/query/utils.py (including safe exception handling
around os.getpgid/os.killpg and clearing the PGID).

Comment on lines +529 to 530
logger.info("Forcefully shutting down")
return -1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick

Consider using exit code 130 on Ctrl‑C.

Reflects conventional SIGINT exit status and can simplify orchestration logic.

-            except KeyboardInterrupt:
-                logger.info("Forcefully shutting down")
-                return -1
+            except KeyboardInterrupt:
+                logger.info("Forcefully shutting down.")
+                sys.exit(130)

Verify that upstream scripts don’t rely on −1 specifically before changing this.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
around lines 529-530, the handler currently returns -1 on Ctrl-C; change the
return value to 130 to reflect conventional SIGINT exit status; update any
docstring/comment nearby to mention SIGINT -> 130 and run a quick grep/CI check
to verify no upstream scripts or callers rely on the -1 value before merging,
and if any do, either update them to accept 130 or map -1 to 130 in a single
compatibility place.

@AVMatthews AVMatthews merged commit 23a05f3 into y-scope:main Sep 23, 2025
19 checks passed
junhaoliao pushed a commit to junhaoliao/clp that referenced this pull request May 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants