Skip to content

fix(job-orchestration): Clean up temporary files on compression failure.#2210

Merged
junhaoliao merged 5 commits into
y-scope:mainfrom
goynam:fix/cleanup-temp-files-on-compression-failure
Apr 20, 2026
Merged

fix(job-orchestration): Clean up temporary files on compression failure.#2210
junhaoliao merged 5 commits into
y-scope:mainfrom
goynam:fix/cleanup-temp-files-on-compression-failure

Conversation

@goynam

@goynam goynam commented Apr 20, 2026

Copy link
Copy Markdown
Contributor

Description

When a compression task fails, cleanup_temporary_files() was not called, leaving *-converted-tmp directories and *-log-paths.txt files in /var/tmp/. Over time these
accumulate and fill the node's ephemeral storage, causing disk pressure and pod victions.

The fix wraps the conversion and compression logic in a try/finally block so that
cleanup_temporary_files() and stderr_log_file.close() are always called regardless of whether compression succeeded, failed, or threw an unexpected exception (e.g., malformed JSON from the compression process).

Additionally:

  • logs_list_path.unlink(missing_ok=True) makes the unlink tolerant of files that don't exist.
  • shutil.rmtree(converted_inputs_dir) errors are caught and logged rather than silently ignored (using ignore_errors=True) so that cleanup failures are visible for debugging.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

Baseline (origin/main)

Rebuilt with task, started CLP, and compressed ~/samples/hive-24hr/:

$ ./sbin/start-clp.sh
2026-04-20T21:17:50.343 INFO [controller] Starting CLP using Docker Compose (full deployment)...
[+] up 16/16
 ✔ Network clp-package-90f5_default                           Created
 ✔ Container clp-package-90f5-queue-1                         Healthy
 ✔ Container clp-package-90f5-redis-1                         Healthy
 ✔ Container clp-package-90f5-database-1                      Healthy
 ✔ Container clp-package-90f5-compression-worker-1            Healthy
 ✔ Container clp-package-90f5-query-worker-1                  Healthy
 ✔ Container clp-package-90f5-results-cache-1                 Healthy
 ✔ Container clp-package-90f5-results-cache-indices-creator-1 Exited
 ✔ Container clp-package-90f5-db-table-creator-1              Exited
 ✔ Container clp-package-90f5-query-scheduler-1               Healthy
 ✔ Container clp-package-90f5-api-server-1                    Healthy
 ✔ Container clp-package-90f5-garbage-collector-1             Healthy
 ✔ Container clp-package-90f5-webui-1                         Healthy
 ✔ Container clp-package-90f5-compression-scheduler-1         Healthy
 ✔ Container clp-package-90f5-reducer-1                       Healthy
2026-04-20T21:18:02.308 INFO [controller] Started CLP.

$ ./sbin/compress.sh --unstructured ~/samples/hive-24hr/
2026-04-20T21:18:37.612 INFO [compress] Compression job 1 submitted.
2026-04-20T21:18:44.125 INFO [compress] Compressed 483.18MB into 4.50MB (107.30x). Speed: 93.68MB/s.
2026-04-20T21:18:50.137 INFO [compress] Compressed 571.65MB into 6.78MB (84.27x). Speed: 51.17MB/s.
2026-04-20T21:19:07.163 INFO [compress] Compressed 893.31MB into 14.78MB (60.44x). Speed: 31.68MB/s.
2026-04-20T21:19:11.671 INFO [compress] Compressed 1.18GB into 22.27MB (54.44x). Speed: 37.07MB/s.
2026-04-20T21:19:13.676 INFO [compress] Compressed 1.50GB into 30.22MB (50.73x). Speed: 44.18MB/s.
2026-04-20T21:19:22.699 INFO [compress] Compressed 2.13GB into 46.48MB (46.87x). Speed: 49.82MB/s.
2026-04-20T21:19:23.200 INFO [compress] Compression finished.
2026-04-20T21:19:23.200 INFO [compress] Compressed 2.44GB into 54.55MB (45.83x). Speed: 56.81MB/s.

$ ./sbin/stop-clp.sh
2026-04-20T21:19:40.882 INFO [controller] Stopping all CLP containers using Docker Compose...
2026-04-20T21:20:02.804 INFO [controller] Stopped CLP.

Fix branch

Rebuilt with task, started CLP, and compressed ~/samples/hive-24hr/:

$ ./sbin/start-clp.sh
2026-04-20T21:22:56.508 INFO [controller] Starting CLP using Docker Compose (full deployment)...
2026-04-20T21:23:08.529 INFO [controller] Started CLP.

$ ./sbin/compress.sh --unstructured ~/samples/hive-24hr/
2026-04-20T21:23:14.519 INFO [compress] Compression job 1 submitted.
2026-04-20T21:23:21.028 INFO [compress] Compressed 483.18MB into 4.48MB (107.89x). Speed: 94.68MB/s.
2026-04-20T21:23:27.036 INFO [compress] Compressed 571.65MB into 6.76MB (84.62x). Speed: 51.45MB/s.
2026-04-20T21:23:44.069 INFO [compress] Compressed 893.31MB into 14.92MB (59.86x). Speed: 31.74MB/s.
2026-04-20T21:23:48.578 INFO [compress] Compressed 1.18GB into 22.73MB (53.33x). Speed: 37.13MB/s.
2026-04-20T21:23:50.582 INFO [compress] Compressed 1.50GB into 30.76MB (49.85x). Speed: 44.24MB/s.
2026-04-20T21:23:59.098 INFO [compress] Compressed 1.81GB into 38.97MB (47.60x). Speed: 42.97MB/s.
2026-04-20T21:24:00.100 INFO [compress] Compressed 2.44GB into 55.09MB (45.37x). Speed: 56.59MB/s.
2026-04-20T21:24:00.601 INFO [compress] Compression finished.
2026-04-20T21:24:00.601 INFO [compress] Compressed 2.44GB into 55.09MB (45.37x). Speed: 56.34MB/s.

$ ./sbin/stop-clp.sh
2026-04-20T21:24:07.659 INFO [controller] Stopping all CLP containers using Docker Compose...
2026-04-20T21:24:29.552 INFO [controller] Stopped CLP.

Compression ratios and speeds are comparable between baseline and fix branch, confirming no
regression in the happy path.

@goynam goynam requested a review from a team as a code owner April 20, 2026 17:18
@coderabbitai

coderabbitai Bot commented Apr 20, 2026

Copy link
Copy Markdown
Contributor

Walkthrough

Compression task control flow restructured: conversion failures return immediately, stderr closure and temp cleanup are guaranteed via finally blocks, compression now streams subprocess stdout to detect archive boundaries, uploads archives to S3 when enabled (aborting on upload errors), updates archive/job metadata and optional indexing, and tolerates missing-file deletions.

Changes

Cohort / File(s) Summary
Compression task
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
Restructured process lifecycle and cleanup: conversion failure returns inside try; unified finally guarantees stderr closure and cleanup_temporary_files(); compression subprocess usage moved under try/finally; reads proc.stdout to detect archive transitions, accumulates totals, calls update_archive_metadata/update_job_metadata, attempts S3 uploads (tracks s3_error and terminates subprocess on upload exception), optionally indexes per-archive, and deletes local archives when S3 writes succeed. Temporary deletions made tolerant (unlink(missing_ok=True)); shutil.rmtree wrapped to log OSError instead of propagating.

Sequence Diagram(s)

sequenceDiagram
  participant Orchestrator
  participant Compressor as Compression Process
  participant FS as File System
  participant S3 as S3 Service
  participant DB as Database
  participant Indexer as Optional Indexer

  Orchestrator ->> Compressor: start compression subprocess
  Compressor -->> Orchestrator: stdout stream (archive chunks / progress)
  Orchestrator ->> FS: write local archive file
  Orchestrator ->> S3: upload archive (if enabled)
  alt upload success
    S3 -->> Orchestrator: upload OK
    Orchestrator ->> DB: update archive/job metadata
    Orchestrator ->> Indexer: enqueue indexing (optional)
    Orchestrator ->> FS: delete local archive file
  else upload failure
    S3 -->> Orchestrator: upload error
    Orchestrator ->> Compressor: terminate subprocess
    Orchestrator ->> FS: preserve local archive/logs, mark error
  end
  Compressor -->> Orchestrator: exit code
  Orchestrator ->> FS: cleanup temp files (unlink with missing_ok, rmtree logged) [always]
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: ensuring temporary files are cleaned up on compression failure, which matches the core objective of moving cleanup outside the success branch.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)

425-551: ⚠️ Potential issue | 🟠 Major

Make temporary cleanup exception-safe.

Line 551 now covers the normal non-zero return-code path, but exceptions before it still bypass cleanup. For example, subprocess.Popen, json.loads, DB updates, or archive post-processing can raise after logs_list_path / converted_inputs_dir have been created, leaving the same /var/tmp artifacts this PR is trying to prevent. Wrap the compression/conversion work in try/finally, and make cleanup idempotent so early returns and exceptions share the same path.

Suggested cleanup pattern
 def cleanup_temporary_files():
-    if logs_list_path is not None:
-        logs_list_path.unlink()
+    logs_list_path.unlink(missing_ok=True)
     if converted_inputs_dir is not None:
-        shutil.rmtree(converted_inputs_dir)
+        shutil.rmtree(converted_inputs_dir, ignore_errors=True)

 # Open stderr log file
 stderr_log_path = logs_dir / f"{instance_id_str}-stderr.log"
 stderr_log_file = open(stderr_log_path, "w")
+try:
+    # conversion/compression subprocess handling and result construction
+    ...
+finally:
+    cleanup_temporary_files()
+    stderr_log_file.close()

This also lets the conversion failure return path drop its separate cleanup call without risking double-unlink failures.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`
around lines 425 - 551, The code currently performs conversion, streaming JSON
parsing, DB updates, S3 uploads and indexing (via
conversion_cmd/conversion_proc, proc, json.loads, _upload_archive_to_s3,
update_archive_metadata, update_job_metadata and the indexer subprocess) without
guaranteeing cleanup_temporary_files() runs on exceptions; refactor by wrapping
the entire conversion+compression loop and subsequent wait/return-code handling
in a try/finally (so any exception from subprocess.Popen, json.loads, DB calls,
S3 upload, or indexing will still run cleanup_temporary_files()), remove the
duplicate cleanup call in the conversion-failure early return, and ensure
cleanup_temporary_files() is idempotent (safe to call multiple times) so early
returns and the finally block won’t error if files were already removed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Around line 425-551: The code currently performs conversion, streaming JSON
parsing, DB updates, S3 uploads and indexing (via
conversion_cmd/conversion_proc, proc, json.loads, _upload_archive_to_s3,
update_archive_metadata, update_job_metadata and the indexer subprocess) without
guaranteeing cleanup_temporary_files() runs on exceptions; refactor by wrapping
the entire conversion+compression loop and subsequent wait/return-code handling
in a try/finally (so any exception from subprocess.Popen, json.loads, DB calls,
S3 upload, or indexing will still run cleanup_temporary_files()), remove the
duplicate cleanup call in the conversion-failure early return, and ensure
cleanup_temporary_files() is idempotent (safe to call multiple times) so early
returns and the finally block won’t error if files were already removed.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 7f036cec-2e6a-49d9-86be-1098b92c4e0d

📥 Commits

Reviewing files that changed from the base of the PR and between 26e6c46 and da1924c4a293b08124783740424d99d98ed6df81.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py

Previously, cleanup_temporary_files() was only called on successful
compression. When compression failed, the converted-tmp directories
and log-paths files were left behind, accumulating on disk over time
and eventually causing ephemeral storage pressure and pod evictions.
@goynam goynam force-pushed the fix/cleanup-temp-files-on-compression-failure branch from da1924c to a8ed061 Compare April 20, 2026 17:24

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Around line 419-423: Wrap the entire compression phase (the block that calls
json.loads, any DB operations, and archive_path.unlink) in a try/finally so
cleanup_temporary_files always runs; move or call cleanup_temporary_files from
the finally clause rather than only at the end of the happy path; inside
cleanup_temporary_files, stop using shutil.rmtree(..., ignore_errors=True) for
converted_inputs_dir and instead catch exceptions around the rmtree call and
around logs_list_path.unlink, logging failures via the module logger (or
processLogger) with contextual messages including the path and exception details
so permission/OS errors are visible.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 90154027-d7d8-4611-9569-159369ebb3b6

📥 Commits

Reviewing files that changed from the base of the PR and between da1924c4a293b08124783740424d99d98ed6df81 and a8ed061.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py

@junhaoliao junhaoliao self-requested a review April 20, 2026 17:45

@junhaoliao junhaoliao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for catching the issue and submitting the fix! please see the suggestions to further improve coverage of failure handling

@junhaoliao junhaoliao requested a review from gibber9809 April 20, 2026 19:15

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)

456-680: ⚠️ Potential issue | 🔴 Critical

Critical: the compression block is duplicated — compression will run twice and the second run will fail because cleanup ran first.

Lines 456–559 (first compression block) and lines 562–677 (second compression block, inside the new try:/finally:) are two copies of the same logic. As written, run_clp will:

  1. Execute Popen(compression_cmd, …) at line 459, drain proc.stdout, wait for the child, and set compression_successful at 555.
  2. Call cleanup_temporary_files() at line 557, which unlinks logs_list_path and removes converted_inputs_dir (the inputs the compressor needs).
  3. Enter the try: at line 562, re-run the dead conversion-failure handler at 563–572 (already handled at 443–454 with a return), then execute Popen(compression_cmd, …) again at line 577 — now with its input list / converted inputs already deleted — so that run fails (or produces nothing), compression_successful is reset to False at 576 and overwritten, and the uncompressed/compressed size counters from the first run are clobbered by the reset at 582–585.
  4. Finally, cleanup_temporary_files() runs a second time and stderr_log_file is closed (680).

Beyond running compression twice, this also breaks the PR's stated goal: on a real failure path, the state reported to the caller reflects the second (broken) run, not the first, and database/job metadata may have already been committed for archives that subsequently appear to fail.

This looks like an editing/merge artifact from applying the earlier review suggestion on top of the original body without removing the original. The intended structure is to keep only one compression block, wrapped by the new try:/finally: that guarantees cleanup and stderr closure.

🛠️ Proposed fix — delete the first, unguarded copy so only the try/finally-wrapped version remains
-    # Start compression
-    logger.debug("Compressing...")
-    compression_successful = False
-    proc = subprocess.Popen(
-        compression_cmd, stdout=subprocess.PIPE, stderr=stderr_log_file, env=compression_env
-    )
-
-    # Compute the total amount of data compressed
-    last_archive_stats = None
-    last_line_decoded = False
-    total_uncompressed_size = 0
-    total_compressed_size = 0
-
-    # Handle job metadata update and S3 write if enabled
-    s3_error = None
-    while not last_line_decoded:
-        stats: dict[str, Any] | None = None
-
-        line = proc.stdout.readline()
-        if not line:
-            last_line_decoded = True
-        else:
-            stats = json.loads(line.decode("utf-8"))
-
-        if last_archive_stats is not None and (
-            None is stats or stats["id"] != last_archive_stats["id"]
-        ):
-            archive_id = last_archive_stats["id"]
-            archive_path = archive_output_dir / archive_id
-            if enable_s3_write:
-                if s3_error is None:
-                    logger.info(f"Uploading archive {archive_id} to S3...")
-                    try:
-                        _upload_archive_to_s3(s3_config, archive_path, archive_id, dataset)
-                        logger.info(f"Finished uploading archive {archive_id} to S3.")
-                    except Exception as err:
-                        logger.exception(f"Failed to upload archive {archive_id}")
-                        s3_error = str(err)
-                        # NOTE: It's possible `proc` finishes before we call `terminate` on it, in
-                        # which case the process will still return success.
-                        proc.terminate()
-
-            if s3_error is None:
-                # We've started a new archive so add the previous archive's last reported size to
-                # the total
-                total_uncompressed_size += last_archive_stats["uncompressed_size"]
-                total_compressed_size += last_archive_stats["size"]
-                with (
-                    closing(sql_adapter.create_connection(True)) as db_conn,
-                    closing(db_conn.cursor(dictionary=True)) as db_cursor,
-                ):
-                    table_prefix = clp_metadata_db_connection_config["table_prefix"]
-                    if StorageEngine.CLP_S == clp_storage_engine:
-                        update_archive_metadata(
-                            db_cursor, table_prefix, dataset, last_archive_stats
-                        )
-                    update_job_metadata(
-                        db_cursor,
-                        job_id,
-                        last_archive_stats,
-                    )
-                    db_conn.commit()
-
-                if StorageEngine.CLP_S == clp_storage_engine:
-                    indexer_cmd = [
-                        str(clp_home / "bin" / "indexer"),
-                        *_get_db_connection_args_for_clp_cmd(clp_metadata_db_connection_config),
-                        dataset,
-                        archive_path,
-                    ]
-
-                    # Set environment variables for database credentials
-                    indexer_env = dict(os.environ)
-                    indexer_env.update(
-                        _get_db_connection_env_vars_for_clp_cmd(clp_metadata_db_connection_config)
-                    )
-
-                    try:
-                        subprocess.run(
-                            indexer_cmd,
-                            stdout=subprocess.DEVNULL,
-                            stderr=stderr_log_file,
-                            check=True,
-                            env=indexer_env,
-                        )
-                    except subprocess.CalledProcessError:
-                        logger.exception("Failed to index archive.")
-
-            if enable_s3_write:
-                archive_path.unlink()
-
-        last_archive_stats = stats
-
-    # Wait for compression to finish
-    return_code = proc.wait()
-
-    if 0 != return_code:
-        logger.error(f"Failed to compress, return_code={return_code!s}")
-    else:
-        compression_successful = True
-
-    cleanup_temporary_files()
-
-    logger.debug("Compressed.")
-
-    # Close stderr log file
     try:
-        if conversion_return_code != 0:
-            logger.error(
-                f"Failed to convert unstructured log text, return_code={conversion_return_code}"
-            )
-            worker_output = {
-                "total_uncompressed_size": 0,
-                "total_compressed_size": 0,
-                "error_message": f"Check logs in {stderr_log_path}",
-            }
-            return CompressionTaskStatus.FAILED, worker_output
-
         # Start compression
         logger.debug("Compressing...")
         compression_successful = False

Also, since the conversion_return_code != 0 path at lines 443–454 already returns (and closes stderr_log_file explicitly before doing so), the duplicated conversion-failure branch inside the try: (lines 563–572) is unreachable and should be removed when collapsing the duplicates.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`
around lines 456 - 680, The compression logic is duplicated: remove the first
unguarded compression block (the Popen loop starting at the initial "Start
compression" comment that uses compression_cmd, proc, last_archive_stats,
total_uncompressed_size/total_compressed_size, s3_error,
update_archive_metadata/update_job_metadata, indexer_cmd, proc.wait(), and
cleanup_temporary_files()) so only the try/finally-wrapped version remains; also
delete the redundant conversion_return_code != 0 branch inside the remaining try
(the duplicated early-return handling) because conversion_return_code is already
handled earlier and that branch is unreachable—ensure stderr_log_file is closed
and cleanup_temporary_files() is only called from the finally block.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Around line 419-429: The cleanup_temporary_files function currently calls
logs_list_path.unlink(missing_ok=True) without catching non-FileNotFoundError
OSErrors, so a PermissionError (or other OSError) will abort cleanup and skip
shutil.rmtree(converted_inputs_dir); wrap the logs_list_path.unlink call in a
try/except that mirrors the converted_inputs_dir handling (catch OSError and
call logger.exception with a clear message referencing logs_list_path) so both
cleanup steps are performed and failures are logged discoverably; keep using
missing_ok=True but still guard for other OSError subclasses in
cleanup_temporary_files.

---

Outside diff comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Around line 456-680: The compression logic is duplicated: remove the first
unguarded compression block (the Popen loop starting at the initial "Start
compression" comment that uses compression_cmd, proc, last_archive_stats,
total_uncompressed_size/total_compressed_size, s3_error,
update_archive_metadata/update_job_metadata, indexer_cmd, proc.wait(), and
cleanup_temporary_files()) so only the try/finally-wrapped version remains; also
delete the redundant conversion_return_code != 0 branch inside the remaining try
(the duplicated early-return handling) because conversion_return_code is already
handled earlier and that branch is unreachable—ensure stderr_log_file is closed
and cleanup_temporary_files() is only called from the finally block.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: aa2212fd-05e3-41be-9b14-ce7b694f040e

📥 Commits

Reviewing files that changed from the base of the PR and between a8ed061 and 68e20c4.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py

Comment on lines 419 to +429
def cleanup_temporary_files():
if logs_list_path is not None:
logs_list_path.unlink()
logs_list_path.unlink(missing_ok=True)
if converted_inputs_dir is not None:
shutil.rmtree(converted_inputs_dir)
try:
shutil.rmtree(converted_inputs_dir)
except OSError:
logger.exception(
"Failed to clean up temporary directory: %s",
converted_inputs_dir,
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor: a failing logs_list_path.unlink() will skip the converted_inputs_dir cleanup.

missing_ok=True only suppresses FileNotFoundError; other OSError subclasses (e.g., PermissionError) still propagate out of cleanup_temporary_files() and prevent the subsequent shutil.rmtree(converted_inputs_dir) from running — the opposite of this PR's goal. Given the reviewer's prior feedback that cleanup failures should be discoverable, not silenced, consider symmetric handling for both paths.

🛠️ Proposed fix
     def cleanup_temporary_files():
         if logs_list_path is not None:
-            logs_list_path.unlink(missing_ok=True)
+            try:
+                logs_list_path.unlink(missing_ok=True)
+            except OSError:
+                logger.exception(
+                    "Failed to clean up temporary log-paths file: %s",
+                    logs_list_path,
+                )
         if converted_inputs_dir is not None:
             try:
                 shutil.rmtree(converted_inputs_dir)
             except OSError:
                 logger.exception(
                     "Failed to clean up temporary directory: %s",
                     converted_inputs_dir,
                 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`
around lines 419 - 429, The cleanup_temporary_files function currently calls
logs_list_path.unlink(missing_ok=True) without catching non-FileNotFoundError
OSErrors, so a PermissionError (or other OSError) will abort cleanup and skip
shutil.rmtree(converted_inputs_dir); wrap the logs_list_path.unlink call in a
try/except that mirrors the converted_inputs_dir handling (catch OSError and
call logger.exception with a clear message referencing logs_list_path) so both
cleanup steps are performed and failures are logged discoverably; keep using
missing_ok=True but still guard for other OSError subclasses in
cleanup_temporary_files.

@gibber9809 gibber9809 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, seems like the last diff wraps all of the code in a try/finally, but doesn't actually delete the previous non-wrapped version, so the whole compression flow from checking the return code from conversion onwards is duplicated?

After that code is deduplicated I think these changes look good.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)

431-561: ⚠️ Potential issue | 🟠 Major

Move conversion subprocess into the guarded scope and explicitly terminate compression subprocess in finally.

The conversion subprocess runs before the try block (lines 435–441), so a Popen failure skips all cleanup. Additionally, the finally block (lines 559–561) doesn't terminate the compression subprocess, leaving it running if an exception occurs during processing or if the parent removes temporary inputs while proc is still active.

Move the conversion subprocess into the try block, initialize proc = None before try, and add explicit termination with timeout handling in the finally block before cleanup.

Suggested lifecycle fix
     # Open stderr log file
     stderr_log_path = logs_dir / f"{instance_id_str}-stderr.log"
     stderr_log_file = open(stderr_log_path, "w")
 
-    conversion_return_code = 0
-    if conversion_cmd is not None:
-        logger.debug("Execute log-converter with command: %s", conversion_cmd)
-        conversion_proc = subprocess.Popen(
-            conversion_cmd, stdout=subprocess.DEVNULL, stderr=stderr_log_file, env=conversion_env
-        )
-        conversion_return_code = conversion_proc.wait()
-
+    proc = None
     try:
+        conversion_return_code = 0
+        if conversion_cmd is not None:
+            logger.debug("Execute log-converter with command: %s", conversion_cmd)
+            conversion_proc = subprocess.Popen(
+                conversion_cmd,
+                stdout=subprocess.DEVNULL,
+                stderr=stderr_log_file,
+                env=conversion_env,
+            )
+            conversion_return_code = conversion_proc.wait()
+
         if conversion_return_code != 0:
             logger.error(
                 f"Failed to convert unstructured log text, return_code={conversion_return_code}"
             )
             worker_output = {
@@
         logger.debug("Compressing...")
         compression_successful = False
         proc = subprocess.Popen(
             compression_cmd, stdout=subprocess.PIPE, stderr=stderr_log_file, env=compression_env
         )
@@
         logger.debug("Compressed.")
     finally:
+        if proc is not None and proc.poll() is None:
+            logger.warning("Terminating compression subprocess after task failure.")
+            proc.terminate()
+            try:
+                proc.wait(timeout=10)
+            except subprocess.TimeoutExpired:
+                logger.warning("Killing compression subprocess after termination timeout.")
+                proc.kill()
+                proc.wait()
         cleanup_temporary_files()
         stderr_log_file.close()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`
around lines 431 - 561, Move the conversion subprocess creation into the try
block and initialize proc = None (and conversion_proc = None if needed) before
the try so any Popen failure is cleaned up; start conversion with conversion_cmd
inside the try and retain conversion_return_code logic; in the finally block
check if proc is not None and is still running then call proc.terminate(), wait
with a short timeout, and if it hasn’t exited call proc.kill() (also do the same
for conversion_proc if present), then proceed to call cleanup_temporary_files()
and close stderr_log_file to ensure no orphaned processes or leaked file
handles; reference symbols: conversion_cmd, conversion_proc,
conversion_return_code, proc, stderr_log_file, cleanup_temporary_files().
♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)

419-429: ⚠️ Potential issue | 🟡 Minor

Duplicate: isolate cleanup failures per path.

Line 421 can still raise an OSError other than “missing file”, which skips the converted-directory cleanup and prevents stderr_log_file.close() in the outer finally. This was already raised in the previous review; the fix is still to wrap the log-path unlink just like shutil.rmtree.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`
around lines 419 - 429, The unlink of logs_list_path in cleanup_temporary_files
can raise OSError and currently would skip the converted_inputs_dir removal and
subsequent outer finally work (e.g., stderr_log_file.close()); wrap the
logs_list_path.unlink(missing_ok=True) call in its own try/except OSError block
(similar to the converted_inputs_dir handling) so each path cleanup is isolated,
log the exception via logger.exception with a clear message including
logs_list_path, and keep the existing try/except for
shutil.rmtree(converted_inputs_dir) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Around line 431-561: Move the conversion subprocess creation into the try
block and initialize proc = None (and conversion_proc = None if needed) before
the try so any Popen failure is cleaned up; start conversion with conversion_cmd
inside the try and retain conversion_return_code logic; in the finally block
check if proc is not None and is still running then call proc.terminate(), wait
with a short timeout, and if it hasn’t exited call proc.kill() (also do the same
for conversion_proc if present), then proceed to call cleanup_temporary_files()
and close stderr_log_file to ensure no orphaned processes or leaked file
handles; reference symbols: conversion_cmd, conversion_proc,
conversion_return_code, proc, stderr_log_file, cleanup_temporary_files().

---

Duplicate comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Around line 419-429: The unlink of logs_list_path in cleanup_temporary_files
can raise OSError and currently would skip the converted_inputs_dir removal and
subsequent outer finally work (e.g., stderr_log_file.close()); wrap the
logs_list_path.unlink(missing_ok=True) call in its own try/except OSError block
(similar to the converted_inputs_dir handling) so each path cleanup is isolated,
log the exception via logger.exception with a clear message including
logs_list_path, and keep the existing try/except for
shutil.rmtree(converted_inputs_dir) unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: a0d2bf5e-a58d-43d3-abdc-b9f9d73ec3e6

📥 Commits

Reviewing files that changed from the base of the PR and between 68e20c4 and 907fe4a.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py

@gibber9809 gibber9809 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. PR title seems fine as well.

@junhaoliao junhaoliao merged commit 4017030 into y-scope:main Apr 20, 2026
24 checks passed
@junhaoliao junhaoliao added this to the Mid-April 2026 milestone Apr 24, 2026
junhaoliao added a commit to junhaoliao/clp that referenced this pull request May 17, 2026
…re. (y-scope#2210)

Co-authored-by: Naman Goyal <namangg@amazon.com>
Co-authored-by: Junhao Liao <junhao.liao@yscope.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants