feat(clp-json): Skip uploading empty query results files to S3.#2135
Conversation
WalkthroughA new helper function Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 Ruff (0.15.7)components/job-orchestration/job_orchestration/executor/query/fs_search_task.pyUnexpected Ruff issue shape at index 6 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`:
- Around line 192-194: The upload_results_to_s3 function currently types
s3_config as Any; change it to use the concrete S3Config type to restore type
safety and match s3_put's expected parameter: update the function signature of
upload_results_to_s3 to accept s3_config: S3Config and add S3Config to the
module imports so the name resolves wherever upload_results_to_s3 and s3_put are
used.
- Around line 195-198: The code calls src_file.stat() without ensuring the file
exists which can raise FileNotFoundError; update the block in fs_search_task.py
to first check src_file.exists() (or src_file.is_file()) before calling stat(),
and if the file is missing log an appropriate error/warning via logger
(including the path) and set the task/result state accordingly instead of
letting the exception bubble (e.g., skip unlink and return or mark failed).
Ensure you still handle the empty-file case (src_file.stat().st_size == 0) only
after confirming existence and only call src_file.unlink() when the file was
verified to exist.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f7746eee-75df-4767-bb20-d9ad08f8c164
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
| def upload_results_to_s3( | ||
| task_results: QueryTaskResult, s3_config: Any, src_file: Path, dest_path: str | ||
| ): |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider using S3Config type hint instead of Any.
Using Any for s3_config loses type safety. The s3_put function expects S3Config (per context snippet 2).
♻️ Proposed type hint improvement
Add S3Config to imports:
from clp_py_utils.clp_config import (
Database,
+ S3Config,
StorageEngine,
StorageType,
WorkerConfig,
)Then update the function signature:
def upload_results_to_s3(
- task_results: QueryTaskResult, s3_config: Any, src_file: Path, dest_path: str
+ task_results: QueryTaskResult, s3_config: S3Config, src_file: Path, dest_path: str
):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def upload_results_to_s3( | |
| task_results: QueryTaskResult, s3_config: Any, src_file: Path, dest_path: str | |
| ): | |
| from clp_py_utils.clp_config import ( | |
| Database, | |
| S3Config, | |
| StorageEngine, | |
| StorageType, | |
| WorkerConfig, | |
| ) | |
| ... | |
| def upload_results_to_s3( | |
| task_results: QueryTaskResult, s3_config: S3Config, src_file: Path, dest_path: str | |
| ): |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`
around lines 192 - 194, The upload_results_to_s3 function currently types
s3_config as Any; change it to use the concrete S3Config type to restore type
safety and match s3_put's expected parameter: update the function signature of
upload_results_to_s3 to accept s3_config: S3Config and add S3Config to the
module imports so the name resolves wherever upload_results_to_s3 and s3_put are
used.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (1)
193-194: 🛠️ Refactor suggestion | 🟠 MajorUse
S3Configinstead ofAnyfors3_config.This keeps static type safety aligned with
s3_put(...)and prevents accidental misuse from slipping through.♻️ Proposed fix
from clp_py_utils.clp_config import ( Database, + S3Config, StorageEngine, StorageType, WorkerConfig, ) @@ def upload_results_to_s3( - task_results: QueryTaskResult, s3_config: Any, src_file: Path, dest_path: str + task_results: QueryTaskResult, s3_config: S3Config, src_file: Path, dest_path: str ):#!/bin/bash set -euo pipefail # Verify s3_put's expected type and current helper signature rg -n "def s3_put\(s3_config:\s*S3Config" components/clp-py-utils/clp_py_utils/s3_utils.py -A3 -B1 rg -n "def upload_results_to_s3\(" components/job-orchestration/job_orchestration/executor/query/fs_search_task.py -A3 -B1🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py` around lines 193 - 194, The s3_config parameter in the function signature (task_results: QueryTaskResult, s3_config: Any, src_file: Path, dest_path: str) should use the concrete S3Config type to match s3_put and preserve static typing; update the parameter to s3_config: S3Config, add the corresponding import for S3Config from the module that defines it (the same module that provides s3_put), and verify any internal uses of s3_config still match S3Config attributes so callers and static checkers align with s3_put.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`:
- Around line 205-206: The code sets task_results.error_log_path =
str(os.getenv("CLP_WORKER_LOG_PATH")) which can produce the string "None" or
point to shared non-actionable locations like "/dev/stdout"; change this to read
the raw env var (val = os.getenv("CLP_WORKER_LOG_PATH")), and only assign
task_results.error_log_path when val is non-empty and not a shared sink (e.g.,
not "/dev/stdout" or "/dev/stderr"); otherwise leave task_results.error_log_path
as None (or unset). Update the assignment near where task_results.status =
QueryTaskStatus.FAILED so the check uses the actual env value and filters out
shared paths before assigning to task_results.error_log_path.
- Around line 203-207: The code currently unlinks src_file regardless of upload
success; change the control flow so src_file.unlink() only runs on successful
upload (i.e., when you set task_results.status to a success state) and is not
executed inside the except block that sets task_results.status =
QueryTaskStatus.FAILED and task_results.error_log_path; locate the upload/try
block around src_file and move or guard the src_file.unlink() call accordingly
so failed uploads keep the local artefact for retry/forensics.
---
Duplicate comments:
In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`:
- Around line 193-194: The s3_config parameter in the function signature
(task_results: QueryTaskResult, s3_config: Any, src_file: Path, dest_path: str)
should use the concrete S3Config type to match s3_put and preserve static
typing; update the parameter to s3_config: S3Config, add the corresponding
import for S3Config from the module that defines it (the same module that
provides s3_put), and verify any internal uses of s3_config still match S3Config
attributes so callers and static checkers align with s3_put.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 2a6d1b5f-94c3-481d-a1ac-ccb5616ee518
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
| except Exception as err: | ||
| logger.error(f"Failed to upload query results {dest_path}: {err}") | ||
| task_results.status = QueryTaskStatus.FAILED | ||
| task_results.error_log_path = str(os.getenv("CLP_WORKER_LOG_PATH")) | ||
| src_file.unlink() |
There was a problem hiding this comment.
Do not delete the local result file after a failed upload.
Line 207 unlinks the file even when Lines 203-206 already marked the task as failed. That removes the only artefact for retry/forensics after transient S3 errors.
🛡️ Proposed fix
except Exception as err:
logger.error(f"Failed to upload query results {dest_path}: {err}")
task_results.status = QueryTaskStatus.FAILED
task_results.error_log_path = str(os.getenv("CLP_WORKER_LOG_PATH"))
- src_file.unlink()
+ return
+ src_file.unlink(missing_ok=True)
return🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`
around lines 203 - 207, The code currently unlinks src_file regardless of upload
success; change the control flow so src_file.unlink() only runs on successful
upload (i.e., when you set task_results.status to a success state) and is not
executed inside the except block that sets task_results.status =
QueryTaskStatus.FAILED and task_results.error_log_path; locate the upload/try
block around src_file and move or guard the src_file.unlink() call accordingly
so failed uploads keep the local artefact for retry/forensics.
hoophalab
left a comment
There was a problem hiding this comment.
LGTM.
Validated that empty files are no longer uploaded to S3 and query results remain unchanged.
Description
This PR changes the flow for streaming results via S3 for the API server to avoid uploading empty results files to S3, which is fairly common for queries that are highly selective. This allows us to save a huge amount of unnecessary latency on both the upload and download side for these empty files.
The fix is simply to stat the results files and check whether the size of their contents is zero to skip upload to S3.
Checklist
breaking change.
Validation performed
Summary by CodeRabbit
Refactor
Bug Fix