feat(clp-package): Add support for compressing unstructured text logs using clp-json.#1505
Conversation
WalkthroughAdds a new --unstructured CLI flag across compressors and adapters; propagates it into FsInputConfig/S3InputConfig and scheduler schemas; for CLP_S unstructured inputs runs a pre-compression log-converter producing converted inputs, forces timestamp_key="timestamp", switches compressor input to converted dir, and centralizes temporary-file cleanup and related warnings. Changes
Sequence DiagramsequenceDiagram
participant User
participant CLI
participant Config
participant CompressionTask
participant LogConverter
participant Compressor
User->>CLI: compress --unstructured <input>
CLI->>Config: Build FsInputConfig/S3InputConfig (unstructured=true)
Config->>CompressionTask: Start task with unstructured flag
alt CLP_S and unstructured
CompressionTask->>LogConverter: Run log-converter on input
alt conversion succeeds
LogConverter-->>CompressionTask: converted_inputs_dir
Note right of CompressionTask #DFF2E1: force timestamp_key="timestamp"
CompressionTask->>Compressor: Compress converted_inputs_dir
else conversion fails
LogConverter-->>CompressionTask: error (stderr)
CompressionTask->>CompressionTask: cleanup temp files
CompressionTask-->>User: return failure with error details
end
else non-unstructured or FS input
CompressionTask->>Compressor: Compress original inputs (logs list)
end
Compressor-->>CompressionTask: compression complete
CompressionTask->>CompressionTask: cleanup temp files
CompressionTask-->>User: return success output
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (5)
components/clp-package-utils/clp_package_utils/scripts/compress.py(3 hunks)components/clp-package-utils/clp_package_utils/scripts/native/compress.py(3 hunks)components/job-orchestration/job_orchestration/executor/compress/compression_task.py(4 hunks)components/job-orchestration/job_orchestration/scheduler/job_config.py(1 hunks)docs/src/user-docs/quick-start/clp-json.md(2 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-09-04T13:51:02.311Z
Learnt from: quinntaylormitchell
PR: y-scope/clp#1283
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:206-210
Timestamp: 2025-09-04T13:51:02.311Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/compress.py, an empty string ("") is a valid value for the --timestamp-key parameter, so validation should not treat empty strings as missing values.
Applied to files:
components/clp-package-utils/clp_package_utils/scripts/compress.py
📚 Learning: 2025-09-28T15:00:22.170Z
Learnt from: LinZhihao-723
PR: y-scope/clp#1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.
Applied to files:
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.
Applied to files:
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
🧬 Code graph analysis (2)
components/job-orchestration/job_orchestration/scheduler/job_config.py (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)
S3Config(418-422)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2)
components/clp-py-utils/clp_py_utils/clp_config.py (1)
StorageEngine(94-96)components/job-orchestration/job_orchestration/scheduler/constants.py (1)
CompressionTaskStatus(40-45)
🪛 LanguageTool
docs/src/user-docs/quick-start/clp-json.md
[uncategorized] ~62-~62: Possible missing preposition found.
Context: ...ct-storage/index). ::: ### Compression unstructured text logs To compress some unstructure...
(AI_HYDRA_LEO_MISSING_OF)
🪛 markdownlint-cli2 (0.18.1)
docs/src/user-docs/quick-start/clp-json.md
6-6: Trailing spaces
Expected: 0 or 2; Actual: 1
(MD009, no-trailing-spaces)
🪛 Ruff (0.14.1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
409-409: Missing return type annotation for private function cleanup_temporary_files
Add return type annotation: None
(ANN202)
422-422: subprocess call: check for execution of untrusted input
(S603)
430-430: Logging statement uses f-string
(G004)
430-430: Use explicit conversion flag
Replace with conversion flag
(RUF010)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (12)
docs/src/user-docs/quick-start/clp-json.md (1)
70-72: Clarify timestamp extraction behavior.The statement "attempt to extract the timestamp" could be more precise. Consider documenting what happens when timestamp extraction fails or when no timestamp is found in the log message.
Do you want to add details about:
- What happens if timestamp extraction fails?
- Whether logs without timestamps are still processed?
- The expected timestamp formats?
components/job-orchestration/job_orchestration/scheduler/job_config.py (1)
29-29: LGTM! Clean addition of the unstructured field.The
unstructuredboolean field is correctly added to bothFsInputConfigandS3InputConfigwith a safe default value ofFalse, ensuring backward compatibility.Also applies to: 37-37
components/clp-package-utils/clp_package_utils/scripts/native/compress.py (2)
231-235: LGTM! Clear argument definition.The
--unstructuredflag is well-defined with appropriate help text.
150-156: LGTM! Correct propagation of the unstructured flag.The
unstructuredparameter is correctly propagated from parsed arguments to bothFsInputConfigandS3InputConfig.Also applies to: 166-174
components/clp-package-utils/clp_package_utils/scripts/compress.py (3)
166-170: LGTM! Consistent argument definition.The
--unstructuredargument definition matches the native compress script.
94-95: LGTM! Correct conditional command generation.The
--unstructuredflag is correctly appended to the compress command when set.
213-223: LGTM! Proper validation logic for flag interaction.The validation correctly handles the interaction between
--timestamp-keyand--unstructured:
- Warns when timestamp-key is missing and unstructured is not set (preserving existing behavior)
- When both are provided, unstructured takes precedence with a clear warning
The logic respects the learning that empty strings are valid for timestamp-key.
Based on learnings
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (5)
5-5: LGTM! Required import for cleanup functionality.The
shutilmodule is correctly imported for removing the converted inputs directory.
302-307: LGTM! Correct conditional timestamp-key handling.The logic correctly:
- Forces
timestamp_keyto"timestamp"when unstructured mode is enabled- Falls back to the user-provided
timestamp_keyfor structured inputs- Preserves existing behavior when
timestamp_keyis NoneThis aligns with the documentation stating that unstructured logs always use
"timestamp"as the timestamp key.
392-407: LGTM! Clean conversion setup for unstructured inputs.The conversion workflow is correctly structured:
- Only activates for CLP_S with unstructured input
- Creates a temporary directory for converted files
- Builds the log-converter command with appropriate arguments
- Passes converted directory to compression instead of using
--files-fromThe conditional logic ensures backward compatibility for structured inputs.
419-437: LGTM! Robust conversion error handling.The conversion execution and error handling is well-structured:
- Runs conversion subprocess with proper logging
- Waits for completion and captures return code
- On failure: performs cleanup, logs error, and returns structured failure output
- Error message directs users to stderr log for debugging
The worker output structure matches the success case, ensuring consistent handling upstream.
537-537: LGTM! Proper cleanup after compression.The cleanup function is correctly called after compression completes, ensuring temporary files are removed regardless of compression success or failure.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
docs/src/user-docs/quick-start/clp-json.md (2)
6-6: Remove trailing space.There's a trailing space at the end of line 6 that should be removed for proper markdown formatting.
Apply this diff:
-If you're using a `clp-json` release, we only support ingesting unstructured text logs by +If you're using a `clp-json` release, we only support ingesting unstructured text logs by
62-62: Fix grammatical error in heading.The heading is missing a verb or preposition. It should be "Compressing unstructured text logs" or "Compression of unstructured text logs".
Apply this diff:
-### Compression unstructured text logs +### Compressing unstructured text logscomponents/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)
409-413: Add return type annotation and use explicit None checks.Two issues flagged in previous reviews remain:
- Missing return type annotation for the function
- Using truthiness checks on Path objects, which are always truthy regardless of file existence
Apply this diff:
- def cleanup_temporary_files(): - if logs_list_path is not None: + def cleanup_temporary_files() -> None: + if logs_list_path is not None: logs_list_path.unlink() - if converted_inputs_path is not None: + if converted_inputs_path is not None: shutil.rmtree(converted_inputs_path)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py(4 hunks)docs/src/user-docs/quick-start/clp-json.md(2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-28T15:00:22.170Z
Learnt from: LinZhihao-723
PR: y-scope/clp#1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.
Applied to files:
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.
Applied to files:
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
🧬 Code graph analysis (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2)
components/clp-py-utils/clp_py_utils/clp_config.py (1)
StorageEngine(94-96)components/job-orchestration/job_orchestration/scheduler/constants.py (1)
CompressionTaskStatus(40-45)
🪛 Ruff (0.14.1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
409-409: Missing return type annotation for private function cleanup_temporary_files
Add return type annotation: None
(ANN202)
422-422: subprocess call: check for execution of untrusted input
(S603)
430-430: Logging statement uses f-string
(G004)
430-430: Use explicit conversion flag
Replace with conversion flag
(RUF010)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: package-image
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
🔇 Additional comments (5)
docs/src/user-docs/quick-start/clp-json.md (1)
64-80: Clear documentation of the unstructured mode.The documentation clearly explains the
--unstructuredflag, provides a concrete example of the log conversion, and notes the fixed timestamp key behaviour. This aligns well with the implementation changes.components/job-orchestration/job_orchestration/executor/compress/compression_task.py (4)
5-5: Appropriate import for cleanup operations.The
shutilimport is correctly added to support removal of the temporary converted inputs directory.
302-307: Correct timestamp-key handling for unstructured mode.The logic correctly forces the timestamp key to
"timestamp"when processing unstructured logs, aligning with the documented behaviour. For structured inputs, it preserves the existing logic.
419-437: Robust conversion error handling.The conversion execution properly checks the return code, performs cleanup on failure, and returns a structured error response with zeroed sizes. The early return pattern prevents unnecessary compression attempts when conversion fails.
537-537: Proper cleanup after compression.The cleanup call is correctly placed after compression completes, ensuring temporary files are removed in both success and failure scenarios.
| conversion_cmd = None | ||
| converted_inputs_path = None | ||
| if StorageEngine.CLP_S == clp_storage_engine and clp_config.input.unstructured: | ||
| converted_inputs_path = tmp_dir / f"{instance_id_str}-converted-tmp" | ||
| converted_inputs_path.mkdir() | ||
| conversion_cmd = [ | ||
| str(clp_home / "bin" / "log-converter"), | ||
| "--inputs-from", | ||
| str(logs_list_path), | ||
| "--output-dir", | ||
| str(converted_inputs_path), | ||
| ] | ||
| compression_cmd.append(str(converted_inputs_path)) | ||
| else: | ||
| compression_cmd.append("--files-from") | ||
| compression_cmd.append(str(logs_list_path)) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Conversion setup logic is correct.
The conversion workflow is properly gated to CLP_S with unstructured input, creates a uniquely named temporary directory, and correctly switches the compression input source based on whether conversion is needed.
For defensive coding, consider adding exist_ok=True to the mkdir call:
- converted_inputs_path.mkdir()
+ converted_inputs_path.mkdir(exist_ok=True)While the unique instance_id_str makes collisions extremely unlikely, this would prevent unexpected failures in edge cases.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| conversion_cmd = None | |
| converted_inputs_path = None | |
| if StorageEngine.CLP_S == clp_storage_engine and clp_config.input.unstructured: | |
| converted_inputs_path = tmp_dir / f"{instance_id_str}-converted-tmp" | |
| converted_inputs_path.mkdir() | |
| conversion_cmd = [ | |
| str(clp_home / "bin" / "log-converter"), | |
| "--inputs-from", | |
| str(logs_list_path), | |
| "--output-dir", | |
| str(converted_inputs_path), | |
| ] | |
| compression_cmd.append(str(converted_inputs_path)) | |
| else: | |
| compression_cmd.append("--files-from") | |
| compression_cmd.append(str(logs_list_path)) | |
| conversion_cmd = None | |
| converted_inputs_path = None | |
| if StorageEngine.CLP_S == clp_storage_engine and clp_config.input.unstructured: | |
| converted_inputs_path = tmp_dir / f"{instance_id_str}-converted-tmp" | |
| converted_inputs_path.mkdir(exist_ok=True) | |
| conversion_cmd = [ | |
| str(clp_home / "bin" / "log-converter"), | |
| "--inputs-from", | |
| str(logs_list_path), | |
| "--output-dir", | |
| str(converted_inputs_path), | |
| ] | |
| compression_cmd.append(str(converted_inputs_path)) | |
| else: | |
| compression_cmd.append("--files-from") | |
| compression_cmd.append(str(logs_list_path)) |
🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
around lines 392 to 407, the temporary directory creation uses
converted_inputs_path.mkdir() and should be made defensive to avoid rare
race/collision failures; change the call to
converted_inputs_path.mkdir(exist_ok=True) (you may also add parents=True if
nested directories are possible) so mkdir does not raise if the directory
already exists.
LinZhihao-723
left a comment
There was a problem hiding this comment.
We've made changes to the compression script in #1476 and will merge it soon. compress-from-s3.py will also need to be updated, as we split the S3 compression from the original compress.py script.
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (4)
components/clp-package-utils/clp_package_utils/scripts/compress.py(3 hunks)components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py(3 hunks)components/clp-package-utils/clp_package_utils/scripts/native/compress.py(4 hunks)docs/src/user-docs/quick-start/clp-json.md(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (9)
components/clp-package-utils/clp_package_utils/scripts/native/compress.py (2)
343-347: LGTM: CLI argument correctly defined.The
--unstructuredflag is properly configured withaction="store_true"and includes clear help text.
160-160: LGTM: Unstructured flag consistently propagated.The
unstructuredparameter is correctly passed to all input config types (FsInputConfig and both S3InputConfig branches). The validation of the interaction between--timestamp-keyand--unstructuredis appropriately handled in the calling scripts before reaching this point.Also applies to: 184-184, 200-200
components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py (3)
183-187: LGTM: CLI argument correctly defined.The
--unstructuredflag matches the definition in other scripts, ensuring consistency across the codebase.
100-101: LGTM: Conditional command generation is correct.The
--unstructuredflag is appropriately appended to the compress command only when set.
263-273: LGTM: Validation logic correctly handles flag interaction.The implementation properly addresses the mutual exclusivity of
--timestamp-keyand--unstructured:
- Warns when neither is provided (line 263)
- Warns and nullifies timestamp_key when both are provided (lines 268-273)
This ensures clear user expectations and prevents conflicting configurations.
components/clp-package-utils/clp_package_utils/scripts/compress.py (3)
153-157: LGTM: CLI argument correctly defined.The
--unstructuredflag is consistent with the definitions in other scripts.
97-98: LGTM: Conditional command generation is correct.The
--unstructuredflag is appropriately appended to the compress command only when set.
208-218: LGTM: Validation logic correctly handles flag interaction.The validation logic is identical to
compress_from_s3.pyand correctly handles the mutual exclusivity of--timestamp-keyand--unstructured. The logic is appropriately scoped within the CLP_S storage engine check since timestamp handling is specific to that engine.Note: This validation logic is duplicated between this file and
compress_from_s3.py. The codebase already contains TODO comments acknowledging this duplication (lines 253-254 in compress_from_s3.py, lines 280-281 in this file, per the AI summary context).docs/src/user-docs/quick-start/clp-json.md (1)
6-7: LGTM: Caution text updated correctly.The revised caution text accurately reflects that clp-json supports ingesting unstructured text logs by converting them to JSON.
LinZhihao-723
left a comment
There was a problem hiding this comment.
Overall lgtm. Some small fixes + docs polishing
|
|
||
| if InputType.S3 == clp_config.input.type: | ||
| compression_env_vars = dict(os.environ) | ||
| compression_env_vars = dict(os.environ) |
There was a problem hiding this comment.
Might be worth to update the return annotation since it doens't return None now.
| compression_cmd.append("--files-from") | ||
| compression_cmd.append(str(logs_list_path)) | ||
| conversion_cmd = None | ||
| converted_inputs_path = None |
There was a problem hiding this comment.
How about converted_inputs_dir?
| # Wait for compression to finish | ||
| return_code = proc.wait() | ||
|
|
||
| cleanup_temporary_files() |
There was a problem hiding this comment.
Preferred to move inside else branch:
if 0 != return_code:
logger.error(f"Failed to compress, return_code={str(return_code)}")
else:
compression_successful = True
cleanup_temporary_files()As if the compression failed, we might want to keep the converted inputs and the logs list file for debugging.
There was a problem hiding this comment.
Sure, we can do that for now. It feels like something that's useful for us but not end users though.
| To compress some unstructured text logs, run: | ||
|
|
||
| ```bash | ||
| sbin/compress.sh --unstructured <path1> [<path2> ...] | ||
| ``` | ||
|
|
||
| The `--unstructured` flag will make clp-json treat your logs as unstructured text, and convert them | ||
| into JSON. The conversion process will attempt to extract the timestamp from the log message, and | ||
| store the timestamp and message in separate fields. | ||
| * E.g., if your log event looks like | ||
| `2024-01-01 00:01:02.345 ERROR...`, it will be converted to | ||
| `{"timestamp": "2024-01-01 00:01:02.345", "message": " ERROR..."}` |
There was a problem hiding this comment.
How about rewriting in the following way (I can't comment it through suggestion due to formatting issue):
clp-json supports compressing unstructured text logs by converting them into JSON format. To enable
this mode, run the compression script with the --unstructured flag:
sbin/compress.sh --unstructured <path1> [<path2> ...]When --unstructured is specified, clp-json will parse the unstructured text and convert each log
event into a JSON object. During this conversion, it attempts to extract the timestamp from each log
message and store it as a separate key-value pair. The resulting JSON object includes:
"timestamp": The extracted timestamp, stored as its original string representation to preserve
the format."message": The original log message.
For example, a log event like:
2024-01-01 00:01:02.345 ERROR...
will be converted into:
{
"timestamp": "2024-01-01 00:01:02.345",
"message": " ERROR..."
}There was a problem hiding this comment.
Yeah, this way of writing it reads very clearly -- will update with very slightly edited wording from what you put here.
| If you're using a `clp-json` release, you can only compress and search JSON logs. This limitation | ||
| will be addressed in a future version of CLP. | ||
| If you're using a `clp-json` release, we only support ingesting unstructured text logs by | ||
| converting them to JSON. This limitation will be addressed in a future version of CLP. |
There was a problem hiding this comment.
Do we still need this "caution" section? I think with the below rewriting, we can drop this section.
There was a problem hiding this comment.
I could maybe downgrade it to a note? I think its a somewhat notable limitation, so I want to make the limitation clear right at the start of the doc.
There was a problem hiding this comment.
Sure make sense. note should be good.
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2)
356-371: Update return description in docstring.The function returns (CompressionTaskStatus, worker_output), not “whether compression was successful”.
- :return: tuple -- (whether compression was successful, output messages) + :return: Tuple[CompressionTaskStatus, Dict[str, Any]] -- (status, worker_output)
343-371: Consider encapsulating flow in a class to manage state and cleanup.A ClpCompressor (holding logger, env, paths) would reduce parameter passing and centralize cleanup via context management. Based on learnings.
♻️ Duplicate comments (3)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (3)
423-437: Make temp dir creation defensive; confirm clp-s positional input path.
- Use exist_ok=True to avoid rare collisions or leftovers after retries.
- Please confirm clp-s expects the converted directory as a positional input (no flag). If it actually requires a flag, this will fail.
- converted_inputs_path.mkdir() + converted_inputs_path.mkdir(exist_ok=True)Would you confirm the CLI accepts
clp-s c <archive_dir> <converted_inputs_dir>without--files-from?
438-443: Add return type and make cleanup idempotent.Annotate return type and tolerate already-removed files/dirs.
- def cleanup_temporary_files(): - if logs_list_path is not None: - logs_list_path.unlink() - if converted_inputs_path is not None: - shutil.rmtree(converted_inputs_path) + def cleanup_temporary_files() -> None: + if logs_list_path is not None: + # Be tolerant if the file was already removed + try: + logs_list_path.unlink(missing_ok=True) + except TypeError: + if logs_list_path.exists(): + logs_list_path.unlink() + if converted_inputs_path is not None: + shutil.rmtree(converted_inputs_path, ignore_errors=True)
566-571: Only clean up on success; fix logging style.Retain artefacts on failure for debugging, and avoid f-string logging.
- cleanup_temporary_files() - if 0 != return_code: - logger.error(f"Failed to compress, return_code={str(return_code)}") - else: - compression_successful = True + if 0 != return_code: + logger.error("Failed to compress, return_code=%d", return_code) + else: + compression_successful = True + cleanup_temporary_files()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py(4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-28T15:00:22.170Z
Learnt from: LinZhihao-723
PR: y-scope/clp#1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.
Applied to files:
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
🧬 Code graph analysis (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (5)
components/job-orchestration/job_orchestration/scheduler/job_config.py (2)
InputType(11-13)ClpIoConfig(56-58)components/clp-py-utils/clp_py_utils/s3_utils.py (1)
get_credential_env_vars(61-99)components/clp-py-utils/clp_py_utils/clp_config.py (1)
StorageEngine(95-97)integration-tests/tests/utils/utils.py (1)
unlink(54-75)components/job-orchestration/job_orchestration/scheduler/constants.py (1)
CompressionTaskStatus(40-45)
🪛 Ruff (0.14.1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
438-438: Missing return type annotation for private function cleanup_temporary_files
Add return type annotation: None
(ANN202)
451-451: subprocess call: check for execution of untrusted input
(S603)
459-459: Logging statement uses f-string
(G004)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: package-image
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
🔇 Additional comments (3)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (3)
291-296: S3 auth scoping for CLP_S structured inputs looks correct.Credentials only added when input is S3 and not unstructured; converter gets creds instead. Good separation.
300-305: Correct to force timestamp_key="timestamp" for unstructured inputs.Ensures consistent key post-conversion; preserves original behaviour for structured inputs.
451-454: Subprocess input trust — validate CLP_HOME provenance.You’re using exec-list form (good) and not shell=True. Please confirm CLP_HOME is controlled by the worker and not user-influenced; otherwise restrict/validate it to a trusted path.
| if conversion_return_code != 0: | ||
| cleanup_temporary_files() | ||
| logger.error( | ||
| f"Failed to convert unstructured log text, return_code={conversion_return_code}" | ||
| ) | ||
| worker_output = { | ||
| "total_uncompressed_size": 0, | ||
| "total_compressed_size": 0, | ||
| "error_message": f"Check logs in {stderr_log_path}", | ||
| } | ||
| return CompressionTaskStatus.FAILED, worker_output | ||
|
|
There was a problem hiding this comment.
Close stderr file on early conversion failure; switch to parameterized logging.
Avoids fd leak and aligns with logging best practices.
- if conversion_return_code != 0:
- cleanup_temporary_files()
- logger.error(
- f"Failed to convert unstructured log text, return_code={conversion_return_code}"
- )
- worker_output = {
- "total_uncompressed_size": 0,
- "total_compressed_size": 0,
- "error_message": f"Check logs in {stderr_log_path}",
- }
- return CompressionTaskStatus.FAILED, worker_output
+ if conversion_return_code != 0:
+ cleanup_temporary_files()
+ logger.error(
+ "Failed to convert unstructured log text, return_code=%d",
+ conversion_return_code,
+ )
+ worker_output = {
+ "total_uncompressed_size": 0,
+ "total_compressed_size": 0,
+ "error_message": f"Check logs in {stderr_log_path}",
+ }
+ stderr_log_file.close()
+ return CompressionTaskStatus.FAILED, worker_output📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if conversion_return_code != 0: | |
| cleanup_temporary_files() | |
| logger.error( | |
| f"Failed to convert unstructured log text, return_code={conversion_return_code}" | |
| ) | |
| worker_output = { | |
| "total_uncompressed_size": 0, | |
| "total_compressed_size": 0, | |
| "error_message": f"Check logs in {stderr_log_path}", | |
| } | |
| return CompressionTaskStatus.FAILED, worker_output | |
| if conversion_return_code != 0: | |
| cleanup_temporary_files() | |
| logger.error( | |
| "Failed to convert unstructured log text, return_code=%d", | |
| conversion_return_code, | |
| ) | |
| worker_output = { | |
| "total_uncompressed_size": 0, | |
| "total_compressed_size": 0, | |
| "error_message": f"Check logs in {stderr_log_path}", | |
| } | |
| stderr_log_file.close() | |
| return CompressionTaskStatus.FAILED, worker_output |
🧰 Tools
🪛 Ruff (0.14.1)
459-459: Logging statement uses f-string
(G004)
🤖 Prompt for AI Agents
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
around lines 456-467: when conversion_return_code != 0 you must close the stderr
file handle to avoid an fd leak and use parameterized logging instead of
f-strings; before calling cleanup_temporary_files() or returning, call the
stderr file's close method (e.g., stderr_file.close() or the actual variable
name used where stderr was opened), and replace logger.error(f"Failed to convert
unstructured log text, return_code={conversion_return_code}") with a
parameterized call like logger.error("Failed to convert unstructured log text,
return_code=%d", conversion_return_code) so logs are emitted safely.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (2)
438-442: Add return type annotation and use explicitis not Nonechecks.The cleanup function has two issues already flagged in previous reviews:
- Missing return type annotation (line 438)
- Should use explicit
is not Nonechecks instead of truthiness checks (lines 439, 441)Since
pathlib.Pathobjects are always truthy, the current checks don't effectively distinguish betweenNoneand a Path object.Based on static analysis
Apply this diff:
- def cleanup_temporary_files(): - if logs_list_path: + def cleanup_temporary_files() -> None: + if logs_list_path is not None: logs_list_path.unlink() - if converted_inputs_dir: + if converted_inputs_dir is not None: shutil.rmtree(converted_inputs_dir)
448-467: Use parameterized logging for the error message.The stderr file close (line 466) correctly addresses the fd leak concern from previous reviews. However, the logging statement at line 459 still uses an f-string, which should be changed to parameterized logging for consistency and best practices.
Based on static analysis
Apply this diff:
logger.error( - f"Failed to convert unstructured log text, return_code={conversion_return_code}" + "Failed to convert unstructured log text, return_code=%d", + conversion_return_code, )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py(5 hunks)docs/src/user-docs/quick-start/clp-json.md(2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-28T15:00:22.170Z
Learnt from: LinZhihao-723
PR: y-scope/clp#1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.
Applied to files:
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
🧬 Code graph analysis (1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (4)
components/job-orchestration/job_orchestration/scheduler/job_config.py (2)
InputType(11-13)ClpIoConfig(56-58)components/clp-py-utils/clp_py_utils/s3_utils.py (1)
get_credential_env_vars(61-99)components/clp-py-utils/clp_py_utils/clp_config.py (1)
StorageEngine(95-97)components/job-orchestration/job_orchestration/scheduler/constants.py (1)
CompressionTaskStatus(40-45)
🪛 LanguageTool
docs/src/user-docs/quick-start/clp-json.md
[uncategorized] ~81-~81: Loose punctuation mark.
Context: ...g JSON object includes: * "timestamp": The extracted timestamp, stored as its ...
(UNLIKELY_OPENING_PUNCTUATION)
[uncategorized] ~83-~83: Loose punctuation mark.
Context: ...ve the timestamp format. * "message": The original log message. For example,...
(UNLIKELY_OPENING_PUNCTUATION)
🪛 Ruff (0.14.1)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
438-438: Missing return type annotation for private function cleanup_temporary_files
Add return type annotation: None
(ANN202)
451-451: subprocess call: check for execution of untrusted input
(S603)
459-459: Logging statement uses f-string
(G004)
568-568: Logging statement uses f-string
(G004)
568-568: Use explicit conversion flag
Replace with conversion flag
(RUF010)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: package-image
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
🔇 Additional comments (7)
docs/src/user-docs/quick-start/clp-json.md (1)
67-103: Documentation for unstructured logs feature is clear and well-structured.The new section comprehensively documents the
--unstructuredflag, including usage syntax, behaviour, JSON transformation details, a concrete example, and the important note about the fixed timestamp key. The writing is clear and accessible to users.components/job-orchestration/job_orchestration/executor/compress/compression_task.py (6)
5-5: LGTM!The
shutilimport is necessary for the temporary directory cleanup functionality added later in the file.
270-270: LGTM!The return type annotation correctly reflects that
compression_env_varsis now always initialized (line 291), neverNone. This improves type safety and addresses the maintainer's feedback.
291-306: LGTM!The credential and timestamp-key logic correctly handles the unstructured input case:
- S3 credentials are excluded from compression environment for unstructured inputs (line 292) because the log-converter handles S3 access, while the compressor only reads the converted local files.
- Timestamp-key is forced to "timestamp" for unstructured inputs (lines 300-303), matching the documented behavior.
- Existing structured input behavior is preserved (lines 303-305).
310-340: LGTM!The log-converter command builder correctly constructs the command and environment variables, including S3 authentication when needed. The implementation follows the same pattern as
_make_clp_s_command_and_env.The maintainer's suggestions for docstring improvements are reasonable refinements but not critical.
423-436: LGTM!The conversion setup correctly gates the conversion workflow to CLP_S with unstructured input and appropriately switches between directory ingestion (line 433) and file-list ingestion (line 436).
Based on learnings
571-571: LGTM!The cleanup is correctly placed in the success path only, preserving temporary files for debugging when compression fails. This aligns with the maintainer's feedback and provides a better debugging experience.
LinZhihao-723
left a comment
There was a problem hiding this comment.
Some formatting updates. Otherwise lgtm. I will approve afterwards.
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
docs/src/user-docs/quick-start/clp-json.md(2 hunks)
🧰 Additional context used
🪛 LanguageTool
docs/src/user-docs/quick-start/clp-json.md
[uncategorized] ~81-~81: Loose punctuation mark.
Context: ...g JSON object includes: * "timestamp": The extracted timestamp, stored as its ...
(UNLIKELY_OPENING_PUNCTUATION)
[uncategorized] ~83-~83: Loose punctuation mark.
Context: ...ve the timestamp format. * "message": The original log message. For example,...
(UNLIKELY_OPENING_PUNCTUATION)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (1)
docs/src/user-docs/quick-start/clp-json.md (1)
67-103: New section is well-structured and clearly explains the feature.The documentation for unstructured text log compression is clear, practical, and includes a helpful example. The note about the forced
"timestamp"key provides important context for users.Note: Static analysis flags at lines 81 and 83 about "loose punctuation" are false positives—these are standard markdown backticks for inline code formatting (e.g.,
"timestamp"), which is correct.
LinZhihao-723
left a comment
There was a problem hiding this comment.
For the PR title, how about:
feat(clp-package): Add support for compressing unstructured text logs using clp-json.
… using clp-json. (y-scope#1505) Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
Description
This PR adds support for ingesting unstructured text logs into clp-json by leveraging the new
log-converterbinary.This feature is exposed via the new
--unstructuredflag in thecompress.pyscript. When this flag is used, the--timestamp-keyflag is ignored, the logs passed to the compression script are treated as unstructured, and the logs are converted bylog-converterin the compression task before by compressed using the timestamp key"timestamp".This PR also includes some docs updates to explain the feature.
Checklist
breaking change.
Validation performed
--unstructuredflag from both S3 and the local filesystem.Summary by CodeRabbit
New Features
Bug Fixes
Documentation