Skip to content

feat(clp-json): Skip uploading empty query results files to S3.#2135

Merged
gibber9809 merged 3 commits into
y-scope:mainfrom
gibber9809:skip-upload-empty-results
Apr 7, 2026
Merged

feat(clp-json): Skip uploading empty query results files to S3.#2135
gibber9809 merged 3 commits into
y-scope:mainfrom
gibber9809:skip-upload-empty-results

Conversation

@gibber9809

@gibber9809 gibber9809 commented Mar 26, 2026

Copy link
Copy Markdown
Contributor

Description

This PR changes the flow for streaming results via S3 for the API server to avoid uploading empty results files to S3, which is fairly common for queries that are highly selective. This allows us to save a huge amount of unnecessary latency on both the upload and download side for these empty files.

The fix is simply to stat the results files and check whether the size of their contents is zero to skip upload to S3.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  1. Started CLP package with staged streams configured to S3 output
  2. Ingested file that produces one archives
  3. Issued one search with some results,, retrieved results correctly, and observed one output stream file uploaded to S3
  4. Issued one search with no results and observed no output files in S3, and no results returned
  5. Checked API server and query worker logs and observed no errors

Summary by CodeRabbit

  • Refactor

    • Consolidated cloud-result upload logic to improve reliability and consistency during query execution.
  • Bug Fix

    • Skips empty result files, ensures temporary files are removed, and records failures with logs when uploads fail—leading to clearer failure visibility and cleaner runtime state.

@coderabbitai

coderabbitai Bot commented Mar 26, 2026

Copy link
Copy Markdown
Contributor

Walkthrough

A new helper function upload_results_to_s3() centralizes S3 upload logic for query result files: it skips zero-byte/missing files, wraps s3_put with start/finish logging, handles upload exceptions by marking task_results.status as QueryTaskStatus.FAILED and setting task_results.error_log_path, and always removes the local source file. search() now calls this helper.

Changes

Cohort / File(s) Summary
S3 Upload Refactoring
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
Extracted inline S3 upload logic into upload_results_to_s3() that validates file size, performs s3_put with start/finish logs, handles exceptions by updating task_results and setting error_log_path, and unlinks the source file. search() was simplified to call the helper under the same S3/write-to-file/succeeded conditions.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding functionality to skip uploading empty query results files to S3, which is the core objective of the PR.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Ruff (0.15.7)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py

Unexpected Ruff issue shape at index 6


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gibber9809 gibber9809 marked this pull request as ready for review March 26, 2026 18:17
@gibber9809 gibber9809 requested a review from a team as a code owner March 26, 2026 18:17

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`:
- Around line 192-194: The upload_results_to_s3 function currently types
s3_config as Any; change it to use the concrete S3Config type to restore type
safety and match s3_put's expected parameter: update the function signature of
upload_results_to_s3 to accept s3_config: S3Config and add S3Config to the
module imports so the name resolves wherever upload_results_to_s3 and s3_put are
used.
- Around line 195-198: The code calls src_file.stat() without ensuring the file
exists which can raise FileNotFoundError; update the block in fs_search_task.py
to first check src_file.exists() (or src_file.is_file()) before calling stat(),
and if the file is missing log an appropriate error/warning via logger
(including the path) and set the task/result state accordingly instead of
letting the exception bubble (e.g., skip unlink and return or mark failed).
Ensure you still handle the empty-file case (src_file.stat().st_size == 0) only
after confirming existence and only call src_file.unlink() when the file was
verified to exist.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: f7746eee-75df-4767-bb20-d9ad08f8c164

📥 Commits

Reviewing files that changed from the base of the PR and between 7eba9fe and ca676e1.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py

Comment on lines +192 to +194
def upload_results_to_s3(
task_results: QueryTaskResult, s3_config: Any, src_file: Path, dest_path: str
):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider using S3Config type hint instead of Any.

Using Any for s3_config loses type safety. The s3_put function expects S3Config (per context snippet 2).

♻️ Proposed type hint improvement

Add S3Config to imports:

 from clp_py_utils.clp_config import (
     Database,
+    S3Config,
     StorageEngine,
     StorageType,
     WorkerConfig,
 )

Then update the function signature:

 def upload_results_to_s3(
-    task_results: QueryTaskResult, s3_config: Any, src_file: Path, dest_path: str
+    task_results: QueryTaskResult, s3_config: S3Config, src_file: Path, dest_path: str
 ):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def upload_results_to_s3(
task_results: QueryTaskResult, s3_config: Any, src_file: Path, dest_path: str
):
from clp_py_utils.clp_config import (
Database,
S3Config,
StorageEngine,
StorageType,
WorkerConfig,
)
...
def upload_results_to_s3(
task_results: QueryTaskResult, s3_config: S3Config, src_file: Path, dest_path: str
):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`
around lines 192 - 194, The upload_results_to_s3 function currently types
s3_config as Any; change it to use the concrete S3Config type to restore type
safety and match s3_put's expected parameter: update the function signature of
upload_results_to_s3 to accept s3_config: S3Config and add S3Config to the
module imports so the name resolves wherever upload_results_to_s3 and s3_put are
used.

Comment thread components/job-orchestration/job_orchestration/executor/query/fs_search_task.py Outdated
@junhaoliao junhaoliao added this to the March 2026 milestone Mar 26, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (1)

193-194: 🛠️ Refactor suggestion | 🟠 Major

Use S3Config instead of Any for s3_config.

This keeps static type safety aligned with s3_put(...) and prevents accidental misuse from slipping through.

♻️ Proposed fix
 from clp_py_utils.clp_config import (
     Database,
+    S3Config,
     StorageEngine,
     StorageType,
     WorkerConfig,
 )
@@
 def upload_results_to_s3(
-    task_results: QueryTaskResult, s3_config: Any, src_file: Path, dest_path: str
+    task_results: QueryTaskResult, s3_config: S3Config, src_file: Path, dest_path: str
 ):
#!/bin/bash
set -euo pipefail

# Verify s3_put's expected type and current helper signature
rg -n "def s3_put\(s3_config:\s*S3Config" components/clp-py-utils/clp_py_utils/s3_utils.py -A3 -B1
rg -n "def upload_results_to_s3\(" components/job-orchestration/job_orchestration/executor/query/fs_search_task.py -A3 -B1
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`
around lines 193 - 194, The s3_config parameter in the function signature
(task_results: QueryTaskResult, s3_config: Any, src_file: Path, dest_path: str)
should use the concrete S3Config type to match s3_put and preserve static
typing; update the parameter to s3_config: S3Config, add the corresponding
import for S3Config from the module that defines it (the same module that
provides s3_put), and verify any internal uses of s3_config still match S3Config
attributes so callers and static checkers align with s3_put.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`:
- Around line 205-206: The code sets task_results.error_log_path =
str(os.getenv("CLP_WORKER_LOG_PATH")) which can produce the string "None" or
point to shared non-actionable locations like "/dev/stdout"; change this to read
the raw env var (val = os.getenv("CLP_WORKER_LOG_PATH")), and only assign
task_results.error_log_path when val is non-empty and not a shared sink (e.g.,
not "/dev/stdout" or "/dev/stderr"); otherwise leave task_results.error_log_path
as None (or unset). Update the assignment near where task_results.status =
QueryTaskStatus.FAILED so the check uses the actual env value and filters out
shared paths before assigning to task_results.error_log_path.
- Around line 203-207: The code currently unlinks src_file regardless of upload
success; change the control flow so src_file.unlink() only runs on successful
upload (i.e., when you set task_results.status to a success state) and is not
executed inside the except block that sets task_results.status =
QueryTaskStatus.FAILED and task_results.error_log_path; locate the upload/try
block around src_file and move or guard the src_file.unlink() call accordingly
so failed uploads keep the local artefact for retry/forensics.

---

Duplicate comments:
In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`:
- Around line 193-194: The s3_config parameter in the function signature
(task_results: QueryTaskResult, s3_config: Any, src_file: Path, dest_path: str)
should use the concrete S3Config type to match s3_put and preserve static
typing; update the parameter to s3_config: S3Config, add the corresponding
import for S3Config from the module that defines it (the same module that
provides s3_put), and verify any internal uses of s3_config still match S3Config
attributes so callers and static checkers align with s3_put.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 2a6d1b5f-94c3-481d-a1ac-ccb5616ee518

📥 Commits

Reviewing files that changed from the base of the PR and between ca676e1 and 4634639.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/executor/query/fs_search_task.py

Comment on lines +203 to +207
except Exception as err:
logger.error(f"Failed to upload query results {dest_path}: {err}")
task_results.status = QueryTaskStatus.FAILED
task_results.error_log_path = str(os.getenv("CLP_WORKER_LOG_PATH"))
src_file.unlink()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not delete the local result file after a failed upload.

Line 207 unlinks the file even when Lines 203-206 already marked the task as failed. That removes the only artefact for retry/forensics after transient S3 errors.

🛡️ Proposed fix
     except Exception as err:
         logger.error(f"Failed to upload query results {dest_path}: {err}")
         task_results.status = QueryTaskStatus.FAILED
         task_results.error_log_path = str(os.getenv("CLP_WORKER_LOG_PATH"))
-    src_file.unlink()
+        return
+    src_file.unlink(missing_ok=True)
     return
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@components/job-orchestration/job_orchestration/executor/query/fs_search_task.py`
around lines 203 - 207, The code currently unlinks src_file regardless of upload
success; change the control flow so src_file.unlink() only runs on successful
upload (i.e., when you set task_results.status to a success state) and is not
executed inside the except block that sets task_results.status =
QueryTaskStatus.FAILED and task_results.error_log_path; locate the upload/try
block around src_file and move or guard the src_file.unlink() call accordingly
so failed uploads keep the local artefact for retry/forensics.

@junhaoliao junhaoliao requested a review from hoophalab April 1, 2026 00:05
@junhaoliao junhaoliao modified the milestones: March 2026, Mid-April 2026 Apr 1, 2026

@hoophalab hoophalab left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Validated that empty files are no longer uploaded to S3 and query results remain unchanged.

@gibber9809 gibber9809 merged commit ad31b89 into y-scope:main Apr 7, 2026
21 checks passed
@gibber9809 gibber9809 deleted the skip-upload-empty-results branch April 7, 2026 18:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants