fix(job-orchestration): Clean up temporary files on compression failure.#2210
Conversation
WalkthroughCompression task control flow restructured: conversion failures return immediately, stderr closure and temp cleanup are guaranteed via finally blocks, compression now streams subprocess stdout to detect archive boundaries, uploads archives to S3 when enabled (aborting on upload errors), updates archive/job metadata and optional indexing, and tolerates missing-file deletions. Changes
Sequence Diagram(s)sequenceDiagram
participant Orchestrator
participant Compressor as Compression Process
participant FS as File System
participant S3 as S3 Service
participant DB as Database
participant Indexer as Optional Indexer
Orchestrator ->> Compressor: start compression subprocess
Compressor -->> Orchestrator: stdout stream (archive chunks / progress)
Orchestrator ->> FS: write local archive file
Orchestrator ->> S3: upload archive (if enabled)
alt upload success
S3 -->> Orchestrator: upload OK
Orchestrator ->> DB: update archive/job metadata
Orchestrator ->> Indexer: enqueue indexing (optional)
Orchestrator ->> FS: delete local archive file
else upload failure
S3 -->> Orchestrator: upload error
Orchestrator ->> Compressor: terminate subprocess
Orchestrator ->> FS: preserve local archive/logs, mark error
end
Compressor -->> Orchestrator: exit code
Orchestrator ->> FS: cleanup temp files (unlink with missing_ok, rmtree logged) [always]
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)
425-551:⚠️ Potential issue | 🟠 MajorMake temporary cleanup exception-safe.
Line 551 now covers the normal non-zero return-code path, but exceptions before it still bypass cleanup. For example,
subprocess.Popen,json.loads, DB updates, or archive post-processing can raise afterlogs_list_path/converted_inputs_dirhave been created, leaving the same/var/tmpartifacts this PR is trying to prevent. Wrap the compression/conversion work intry/finally, and make cleanup idempotent so early returns and exceptions share the same path.Suggested cleanup pattern
def cleanup_temporary_files(): - if logs_list_path is not None: - logs_list_path.unlink() + logs_list_path.unlink(missing_ok=True) if converted_inputs_dir is not None: - shutil.rmtree(converted_inputs_dir) + shutil.rmtree(converted_inputs_dir, ignore_errors=True) # Open stderr log file stderr_log_path = logs_dir / f"{instance_id_str}-stderr.log" stderr_log_file = open(stderr_log_path, "w") +try: + # conversion/compression subprocess handling and result construction + ... +finally: + cleanup_temporary_files() + stderr_log_file.close()This also lets the conversion failure return path drop its separate cleanup call without risking double-unlink failures.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/job-orchestration/job_orchestration/executor/compress/compression_task.py` around lines 425 - 551, The code currently performs conversion, streaming JSON parsing, DB updates, S3 uploads and indexing (via conversion_cmd/conversion_proc, proc, json.loads, _upload_archive_to_s3, update_archive_metadata, update_job_metadata and the indexer subprocess) without guaranteeing cleanup_temporary_files() runs on exceptions; refactor by wrapping the entire conversion+compression loop and subsequent wait/return-code handling in a try/finally (so any exception from subprocess.Popen, json.loads, DB calls, S3 upload, or indexing will still run cleanup_temporary_files()), remove the duplicate cleanup call in the conversion-failure early return, and ensure cleanup_temporary_files() is idempotent (safe to call multiple times) so early returns and the finally block won’t error if files were already removed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Around line 425-551: The code currently performs conversion, streaming JSON
parsing, DB updates, S3 uploads and indexing (via
conversion_cmd/conversion_proc, proc, json.loads, _upload_archive_to_s3,
update_archive_metadata, update_job_metadata and the indexer subprocess) without
guaranteeing cleanup_temporary_files() runs on exceptions; refactor by wrapping
the entire conversion+compression loop and subsequent wait/return-code handling
in a try/finally (so any exception from subprocess.Popen, json.loads, DB calls,
S3 upload, or indexing will still run cleanup_temporary_files()), remove the
duplicate cleanup call in the conversion-failure early return, and ensure
cleanup_temporary_files() is idempotent (safe to call multiple times) so early
returns and the finally block won’t error if files were already removed.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 7f036cec-2e6a-49d9-86be-1098b92c4e0d
📥 Commits
Reviewing files that changed from the base of the PR and between 26e6c46 and da1924c4a293b08124783740424d99d98ed6df81.
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
Previously, cleanup_temporary_files() was only called on successful compression. When compression failed, the converted-tmp directories and log-paths files were left behind, accumulating on disk over time and eventually causing ephemeral storage pressure and pod evictions.
da1924c to
a8ed061
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Around line 419-423: Wrap the entire compression phase (the block that calls
json.loads, any DB operations, and archive_path.unlink) in a try/finally so
cleanup_temporary_files always runs; move or call cleanup_temporary_files from
the finally clause rather than only at the end of the happy path; inside
cleanup_temporary_files, stop using shutil.rmtree(..., ignore_errors=True) for
converted_inputs_dir and instead catch exceptions around the rmtree call and
around logs_list_path.unlink, logging failures via the module logger (or
processLogger) with contextual messages including the path and exception details
so permission/OS errors are visible.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 90154027-d7d8-4611-9569-159369ebb3b6
📥 Commits
Reviewing files that changed from the base of the PR and between da1924c4a293b08124783740424d99d98ed6df81 and a8ed061.
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
junhaoliao
left a comment
There was a problem hiding this comment.
thanks for catching the issue and submitting the fix! please see the suggestions to further improve coverage of failure handling
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)
456-680:⚠️ Potential issue | 🔴 CriticalCritical: the compression block is duplicated — compression will run twice and the second run will fail because cleanup ran first.
Lines 456–559 (first compression block) and lines 562–677 (second compression block, inside the new
try:/finally:) are two copies of the same logic. As written,run_clpwill:
- Execute
Popen(compression_cmd, …)at line 459, drainproc.stdout, wait for the child, and setcompression_successfulat 555.- Call
cleanup_temporary_files()at line 557, which unlinkslogs_list_pathand removesconverted_inputs_dir(the inputs the compressor needs).- Enter the
try:at line 562, re-run the dead conversion-failure handler at 563–572 (already handled at 443–454 with areturn), then executePopen(compression_cmd, …)again at line 577 — now with its input list / converted inputs already deleted — so that run fails (or produces nothing),compression_successfulis reset toFalseat 576 and overwritten, and the uncompressed/compressed size counters from the first run are clobbered by the reset at 582–585.- Finally,
cleanup_temporary_files()runs a second time andstderr_log_fileis closed (680).Beyond running compression twice, this also breaks the PR's stated goal: on a real failure path, the state reported to the caller reflects the second (broken) run, not the first, and database/job metadata may have already been committed for archives that subsequently appear to fail.
This looks like an editing/merge artifact from applying the earlier review suggestion on top of the original body without removing the original. The intended structure is to keep only one compression block, wrapped by the new
try:/finally:that guarantees cleanup and stderr closure.🛠️ Proposed fix — delete the first, unguarded copy so only the try/finally-wrapped version remains
- # Start compression - logger.debug("Compressing...") - compression_successful = False - proc = subprocess.Popen( - compression_cmd, stdout=subprocess.PIPE, stderr=stderr_log_file, env=compression_env - ) - - # Compute the total amount of data compressed - last_archive_stats = None - last_line_decoded = False - total_uncompressed_size = 0 - total_compressed_size = 0 - - # Handle job metadata update and S3 write if enabled - s3_error = None - while not last_line_decoded: - stats: dict[str, Any] | None = None - - line = proc.stdout.readline() - if not line: - last_line_decoded = True - else: - stats = json.loads(line.decode("utf-8")) - - if last_archive_stats is not None and ( - None is stats or stats["id"] != last_archive_stats["id"] - ): - archive_id = last_archive_stats["id"] - archive_path = archive_output_dir / archive_id - if enable_s3_write: - if s3_error is None: - logger.info(f"Uploading archive {archive_id} to S3...") - try: - _upload_archive_to_s3(s3_config, archive_path, archive_id, dataset) - logger.info(f"Finished uploading archive {archive_id} to S3.") - except Exception as err: - logger.exception(f"Failed to upload archive {archive_id}") - s3_error = str(err) - # NOTE: It's possible `proc` finishes before we call `terminate` on it, in - # which case the process will still return success. - proc.terminate() - - if s3_error is None: - # We've started a new archive so add the previous archive's last reported size to - # the total - total_uncompressed_size += last_archive_stats["uncompressed_size"] - total_compressed_size += last_archive_stats["size"] - with ( - closing(sql_adapter.create_connection(True)) as db_conn, - closing(db_conn.cursor(dictionary=True)) as db_cursor, - ): - table_prefix = clp_metadata_db_connection_config["table_prefix"] - if StorageEngine.CLP_S == clp_storage_engine: - update_archive_metadata( - db_cursor, table_prefix, dataset, last_archive_stats - ) - update_job_metadata( - db_cursor, - job_id, - last_archive_stats, - ) - db_conn.commit() - - if StorageEngine.CLP_S == clp_storage_engine: - indexer_cmd = [ - str(clp_home / "bin" / "indexer"), - *_get_db_connection_args_for_clp_cmd(clp_metadata_db_connection_config), - dataset, - archive_path, - ] - - # Set environment variables for database credentials - indexer_env = dict(os.environ) - indexer_env.update( - _get_db_connection_env_vars_for_clp_cmd(clp_metadata_db_connection_config) - ) - - try: - subprocess.run( - indexer_cmd, - stdout=subprocess.DEVNULL, - stderr=stderr_log_file, - check=True, - env=indexer_env, - ) - except subprocess.CalledProcessError: - logger.exception("Failed to index archive.") - - if enable_s3_write: - archive_path.unlink() - - last_archive_stats = stats - - # Wait for compression to finish - return_code = proc.wait() - - if 0 != return_code: - logger.error(f"Failed to compress, return_code={return_code!s}") - else: - compression_successful = True - - cleanup_temporary_files() - - logger.debug("Compressed.") - - # Close stderr log file try: - if conversion_return_code != 0: - logger.error( - f"Failed to convert unstructured log text, return_code={conversion_return_code}" - ) - worker_output = { - "total_uncompressed_size": 0, - "total_compressed_size": 0, - "error_message": f"Check logs in {stderr_log_path}", - } - return CompressionTaskStatus.FAILED, worker_output - # Start compression logger.debug("Compressing...") compression_successful = FalseAlso, since the
conversion_return_code != 0path at lines 443–454 already returns (and closesstderr_log_fileexplicitly before doing so), the duplicated conversion-failure branch inside thetry:(lines 563–572) is unreachable and should be removed when collapsing the duplicates.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/job-orchestration/job_orchestration/executor/compress/compression_task.py` around lines 456 - 680, The compression logic is duplicated: remove the first unguarded compression block (the Popen loop starting at the initial "Start compression" comment that uses compression_cmd, proc, last_archive_stats, total_uncompressed_size/total_compressed_size, s3_error, update_archive_metadata/update_job_metadata, indexer_cmd, proc.wait(), and cleanup_temporary_files()) so only the try/finally-wrapped version remains; also delete the redundant conversion_return_code != 0 branch inside the remaining try (the duplicated early-return handling) because conversion_return_code is already handled earlier and that branch is unreachable—ensure stderr_log_file is closed and cleanup_temporary_files() is only called from the finally block.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Around line 419-429: The cleanup_temporary_files function currently calls
logs_list_path.unlink(missing_ok=True) without catching non-FileNotFoundError
OSErrors, so a PermissionError (or other OSError) will abort cleanup and skip
shutil.rmtree(converted_inputs_dir); wrap the logs_list_path.unlink call in a
try/except that mirrors the converted_inputs_dir handling (catch OSError and
call logger.exception with a clear message referencing logs_list_path) so both
cleanup steps are performed and failures are logged discoverably; keep using
missing_ok=True but still guard for other OSError subclasses in
cleanup_temporary_files.
---
Outside diff comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Around line 456-680: The compression logic is duplicated: remove the first
unguarded compression block (the Popen loop starting at the initial "Start
compression" comment that uses compression_cmd, proc, last_archive_stats,
total_uncompressed_size/total_compressed_size, s3_error,
update_archive_metadata/update_job_metadata, indexer_cmd, proc.wait(), and
cleanup_temporary_files()) so only the try/finally-wrapped version remains; also
delete the redundant conversion_return_code != 0 branch inside the remaining try
(the duplicated early-return handling) because conversion_return_code is already
handled earlier and that branch is unreachable—ensure stderr_log_file is closed
and cleanup_temporary_files() is only called from the finally block.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: aa2212fd-05e3-41be-9b14-ce7b694f040e
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
| def cleanup_temporary_files(): | ||
| if logs_list_path is not None: | ||
| logs_list_path.unlink() | ||
| logs_list_path.unlink(missing_ok=True) | ||
| if converted_inputs_dir is not None: | ||
| shutil.rmtree(converted_inputs_dir) | ||
| try: | ||
| shutil.rmtree(converted_inputs_dir) | ||
| except OSError: | ||
| logger.exception( | ||
| "Failed to clean up temporary directory: %s", | ||
| converted_inputs_dir, | ||
| ) |
There was a problem hiding this comment.
Minor: a failing logs_list_path.unlink() will skip the converted_inputs_dir cleanup.
missing_ok=True only suppresses FileNotFoundError; other OSError subclasses (e.g., PermissionError) still propagate out of cleanup_temporary_files() and prevent the subsequent shutil.rmtree(converted_inputs_dir) from running — the opposite of this PR's goal. Given the reviewer's prior feedback that cleanup failures should be discoverable, not silenced, consider symmetric handling for both paths.
🛠️ Proposed fix
def cleanup_temporary_files():
if logs_list_path is not None:
- logs_list_path.unlink(missing_ok=True)
+ try:
+ logs_list_path.unlink(missing_ok=True)
+ except OSError:
+ logger.exception(
+ "Failed to clean up temporary log-paths file: %s",
+ logs_list_path,
+ )
if converted_inputs_dir is not None:
try:
shutil.rmtree(converted_inputs_dir)
except OSError:
logger.exception(
"Failed to clean up temporary directory: %s",
converted_inputs_dir,
)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`
around lines 419 - 429, The cleanup_temporary_files function currently calls
logs_list_path.unlink(missing_ok=True) without catching non-FileNotFoundError
OSErrors, so a PermissionError (or other OSError) will abort cleanup and skip
shutil.rmtree(converted_inputs_dir); wrap the logs_list_path.unlink call in a
try/except that mirrors the converted_inputs_dir handling (catch OSError and
call logger.exception with a clear message referencing logs_list_path) so both
cleanup steps are performed and failures are logged discoverably; keep using
missing_ok=True but still guard for other OSError subclasses in
cleanup_temporary_files.
gibber9809
left a comment
There was a problem hiding this comment.
Sorry, seems like the last diff wraps all of the code in a try/finally, but doesn't actually delete the previous non-wrapped version, so the whole compression flow from checking the return code from conversion onwards is duplicated?
After that code is deduplicated I think these changes look good.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)
431-561:⚠️ Potential issue | 🟠 MajorMove conversion subprocess into the guarded scope and explicitly terminate compression subprocess in finally.
The conversion subprocess runs before the
tryblock (lines 435–441), so aPopenfailure skips all cleanup. Additionally, thefinallyblock (lines 559–561) doesn't terminate the compression subprocess, leaving it running if an exception occurs during processing or if the parent removes temporary inputs whileprocis still active.Move the conversion subprocess into the
tryblock, initializeproc = Nonebeforetry, and add explicit termination with timeout handling in thefinallyblock before cleanup.Suggested lifecycle fix
# Open stderr log file stderr_log_path = logs_dir / f"{instance_id_str}-stderr.log" stderr_log_file = open(stderr_log_path, "w") - conversion_return_code = 0 - if conversion_cmd is not None: - logger.debug("Execute log-converter with command: %s", conversion_cmd) - conversion_proc = subprocess.Popen( - conversion_cmd, stdout=subprocess.DEVNULL, stderr=stderr_log_file, env=conversion_env - ) - conversion_return_code = conversion_proc.wait() - + proc = None try: + conversion_return_code = 0 + if conversion_cmd is not None: + logger.debug("Execute log-converter with command: %s", conversion_cmd) + conversion_proc = subprocess.Popen( + conversion_cmd, + stdout=subprocess.DEVNULL, + stderr=stderr_log_file, + env=conversion_env, + ) + conversion_return_code = conversion_proc.wait() + if conversion_return_code != 0: logger.error( f"Failed to convert unstructured log text, return_code={conversion_return_code}" ) worker_output = { @@ logger.debug("Compressing...") compression_successful = False proc = subprocess.Popen( compression_cmd, stdout=subprocess.PIPE, stderr=stderr_log_file, env=compression_env ) @@ logger.debug("Compressed.") finally: + if proc is not None and proc.poll() is None: + logger.warning("Terminating compression subprocess after task failure.") + proc.terminate() + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + logger.warning("Killing compression subprocess after termination timeout.") + proc.kill() + proc.wait() cleanup_temporary_files() stderr_log_file.close()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/job-orchestration/job_orchestration/executor/compress/compression_task.py` around lines 431 - 561, Move the conversion subprocess creation into the try block and initialize proc = None (and conversion_proc = None if needed) before the try so any Popen failure is cleaned up; start conversion with conversion_cmd inside the try and retain conversion_return_code logic; in the finally block check if proc is not None and is still running then call proc.terminate(), wait with a short timeout, and if it hasn’t exited call proc.kill() (also do the same for conversion_proc if present), then proceed to call cleanup_temporary_files() and close stderr_log_file to ensure no orphaned processes or leaked file handles; reference symbols: conversion_cmd, conversion_proc, conversion_return_code, proc, stderr_log_file, cleanup_temporary_files().
♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)
419-429:⚠️ Potential issue | 🟡 MinorDuplicate: isolate cleanup failures per path.
Line 421 can still raise an
OSErrorother than “missing file”, which skips the converted-directory cleanup and preventsstderr_log_file.close()in the outerfinally. This was already raised in the previous review; the fix is still to wrap the log-path unlink just likeshutil.rmtree.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/job-orchestration/job_orchestration/executor/compress/compression_task.py` around lines 419 - 429, The unlink of logs_list_path in cleanup_temporary_files can raise OSError and currently would skip the converted_inputs_dir removal and subsequent outer finally work (e.g., stderr_log_file.close()); wrap the logs_list_path.unlink(missing_ok=True) call in its own try/except OSError block (similar to the converted_inputs_dir handling) so each path cleanup is isolated, log the exception via logger.exception with a clear message including logs_list_path, and keep the existing try/except for shutil.rmtree(converted_inputs_dir) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Around line 431-561: Move the conversion subprocess creation into the try
block and initialize proc = None (and conversion_proc = None if needed) before
the try so any Popen failure is cleaned up; start conversion with conversion_cmd
inside the try and retain conversion_return_code logic; in the finally block
check if proc is not None and is still running then call proc.terminate(), wait
with a short timeout, and if it hasn’t exited call proc.kill() (also do the same
for conversion_proc if present), then proceed to call cleanup_temporary_files()
and close stderr_log_file to ensure no orphaned processes or leaked file
handles; reference symbols: conversion_cmd, conversion_proc,
conversion_return_code, proc, stderr_log_file, cleanup_temporary_files().
---
Duplicate comments:
In
`@components/job-orchestration/job_orchestration/executor/compress/compression_task.py`:
- Around line 419-429: The unlink of logs_list_path in cleanup_temporary_files
can raise OSError and currently would skip the converted_inputs_dir removal and
subsequent outer finally work (e.g., stderr_log_file.close()); wrap the
logs_list_path.unlink(missing_ok=True) call in its own try/except OSError block
(similar to the converted_inputs_dir handling) so each path cleanup is isolated,
log the exception via logger.exception with a clear message including
logs_list_path, and keep the existing try/except for
shutil.rmtree(converted_inputs_dir) unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a0d2bf5e-a58d-43d3-abdc-b9f9d73ec3e6
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
gibber9809
left a comment
There was a problem hiding this comment.
LGTM. PR title seems fine as well.
…re. (y-scope#2210) Co-authored-by: Naman Goyal <namangg@amazon.com> Co-authored-by: Junhao Liao <junhao.liao@yscope.com>
Description
When a compression task fails,
cleanup_temporary_files()was not called, leaving*-converted-tmpdirectories and*-log-paths.txtfiles in/var/tmp/. Over time theseaccumulate and fill the node's ephemeral storage, causing disk pressure and pod victions.
The fix wraps the conversion and compression logic in a
try/finallyblock so thatcleanup_temporary_files()andstderr_log_file.close()are always called regardless of whether compression succeeded, failed, or threw an unexpected exception (e.g., malformed JSON from the compression process).Additionally:
logs_list_path.unlink(missing_ok=True)makes the unlink tolerant of files that don't exist.shutil.rmtree(converted_inputs_dir)errors are caught and logged rather than silently ignored (usingignore_errors=True) so that cleanup failures are visible for debugging.Checklist
Validation performed
Baseline (origin/main)
Rebuilt with
task, started CLP, and compressed~/samples/hive-24hr/:Fix branch
Rebuilt with
task, started CLP, and compressed~/samples/hive-24hr/:Compression ratios and speeds are comparable between baseline and fix branch, confirming no
regression in the happy path.