feat(clp-package): Add support for uploading extracted streams to S3.#662
Conversation
|
Warning Rate limit exceeded@haiqi96 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 5 minutes and 7 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (4)
WalkthroughThis pull request introduces a series of changes across multiple components, primarily focusing on the management of stream output configurations and command-line argument parsing. Key modifications include the refactoring of how the Changes
Sequence DiagramsequenceDiagram
participant CLI as Command Line Interface
participant Config as CLPConfig
participant Task as Extraction Task
participant Storage as Storage Handler
CLI->>Config: Set stream output configuration
Config->>Storage: Initialize storage (FS/S3)
Config->>Storage: Set directory path
CLI->>Task: Start extraction
Task->>Storage: Retrieve output directory
Task->>Task: Execute extraction
Task->>Storage: Upload results if S3 enabled
Task-->>CLI: Return extraction results
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
|
|
||
| if s3_error: | ||
| task_results.status = QueryTaskStatus.FAILED | ||
| task_results.error_log_path = "S3 Failed" |
There was a problem hiding this comment.
Any suggestion how should we handle error if binaries pass but s3_update fails?
Should we write the error into a local err_log file and update error_log_path accordingly?
There was a problem hiding this comment.
Is there any way to get the logging destination from the logger object we get from celery and use it to set error_log_path here? Since we log the errors in this python code there it seems like the natural thing to do.
There was a problem hiding this comment.
It's a bit tricky because the destination is set through the -f option for container.
Tried a few tricks I found online but none of them work, maybe we can just pass the log_path via environmental variable from start_clp.py and print it out at this line?
| REDIS_COMPONENT_NAME, | ||
| REDUCER_COMPONENT_NAME, | ||
| RESULTS_CACHE_COMPONENT_NAME, | ||
| StorageType, |
There was a problem hiding this comment.
fixing a previous redundant import
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
components/job-orchestration/job_orchestration/executor/query/utils.py (1)
50-50: Consider using UTF-8 encoding for better character support.The current implementation uses ASCII decoding which might fail with non-ASCII characters in the output. Consider using UTF-8 for better character support.
- return task_result, stdout_data.decode("ascii") + return task_result, stdout_data.decode("utf-8")Also applies to: 64-64, 86-86, 113-113
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (1)
187-189: Add more detailed error information.The current error message "S3 Failed" is not descriptive enough for debugging purposes.
- task_results.error_log_path = "S3 Failed" + task_results.error_log_path = f"S3 upload failed: {s3_error}"components/core/src/clp/clo/clo.cpp (1)
216-222: Consider explicit JSON formatting settings.The current implementation uses automatic indentation (-1) and ignores JSON formatting errors. Consider:
- Using explicit indentation (e.g., 2 or 4 spaces) for consistent output
- Handling JSON formatting errors to catch potential issues
- std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore) + std::cout << json_msg.dump(2, ' ', true) << std::endl;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
components/clp-package-utils/clp_package_utils/general.py(2 hunks)components/clp-package-utils/clp_package_utils/scripts/start_clp.py(2 hunks)components/clp-py-utils/clp_py_utils/clp_config.py(9 hunks)components/core/src/clp/clo/CommandLineArguments.cpp(1 hunks)components/core/src/clp/clo/CommandLineArguments.hpp(2 hunks)components/core/src/clp/clo/clo.cpp(2 hunks)components/core/src/clp_s/CommandLineArguments.cpp(2 hunks)components/core/src/clp_s/CommandLineArguments.hpp(2 hunks)components/core/src/clp_s/JsonConstructor.cpp(2 hunks)components/core/src/clp_s/JsonConstructor.hpp(1 hunks)components/core/src/clp_s/clp-s.cpp(1 hunks)components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py(7 hunks)components/job-orchestration/job_orchestration/executor/query/fs_search_task.py(2 hunks)components/job-orchestration/job_orchestration/executor/query/utils.py(5 hunks)
🧰 Additional context used
📓 Path-based instructions (8)
components/core/src/clp_s/JsonConstructor.hpp (1)
Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.
components/core/src/clp_s/JsonConstructor.cpp (1)
Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.
components/core/src/clp/clo/CommandLineArguments.cpp (1)
Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.
components/core/src/clp/clo/CommandLineArguments.hpp (1)
Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.
components/core/src/clp_s/clp-s.cpp (1)
Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.
components/core/src/clp/clo/clo.cpp (1)
Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.
components/core/src/clp_s/CommandLineArguments.hpp (1)
Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.
components/core/src/clp_s/CommandLineArguments.cpp (1)
Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: centos-stream-9-static-linked-bins
- GitHub Check: ubuntu-focal-static-linked-bins
- GitHub Check: ubuntu-jammy-static-linked-bins
- GitHub Check: centos-stream-9-dynamic-linked-bins
- GitHub Check: ubuntu-focal-dynamic-linked-bins
- GitHub Check: ubuntu-jammy-dynamic-linked-bins
- GitHub Check: build-macos (macos-14, false)
- GitHub Check: build-macos (macos-14, true)
- GitHub Check: build-macos (macos-13, false)
- GitHub Check: build-macos (macos-13, true)
🔇 Additional comments (29)
components/clp-package-utils/clp_package_utils/general.py (2)
256-266: LGTM! Good encapsulation of stream output directory access.The changes improve encapsulation by using getter/setter methods instead of direct attribute access, which is consistent with object-oriented principles.
278-278: LGTM! Proper handling of stream output configuration.The change correctly copies the entire stream output configuration instead of just the directory, which is more robust and maintainable.
components/clp-py-utils/clp_py_utils/clp_config.py (8)
43-43: LGTM! Well-defined constant for default data directory.The constant follows naming conventions and provides a centralized definition for the default data directory path.
344-344: LGTM! Improved type safety for directory field.Using
pathlib.Pathinstead of a generic type improves type safety and IDE support.
363-363: LGTM! Consistent type safety improvement for S3 staging directory.Using
pathlib.Pathmaintains consistency withFsStorageand improves type safety.
381-395: LGTM! Well-structured storage class hierarchy.The new storage classes provide clear specialization for different storage types with appropriate default paths. The inheritance hierarchy is clean and logical.
397-417: LGTM! Well-designed helper functions for directory management.The helper functions properly encapsulate storage type-specific logic with appropriate error handling. The private scope is correctly indicated with the underscore prefix.
420-420: LGTM! Improved type safety and encapsulation in ArchiveOutput.The changes properly leverage the new specialized storage types and helper functions for better type safety and encapsulation.
Also applies to: 451-454
464-464: LGTM! Consistent implementation in StreamOutput.The changes maintain consistency with ArchiveOutput, properly implementing the same pattern for directory management.
Also applies to: 473-477
552-552: LGTM! Proper adaptation of CLPConfig methods.The changes correctly adapt the configuration methods to use the new directory access pattern while maintaining clear error messages.
Also applies to: 582-582
components/clp-package-utils/clp_package_utils/scripts/start_clp.py (2)
707-707: LGTM! Consistent use of directory getter method.The change properly uses the new getter method for stream output directory, maintaining consistency with the updated storage configuration model.
925-925: LGTM! Proper integration with log viewer settings.The change correctly uses the new getter method when configuring the log viewer's stream files directory.
components/core/src/clp_s/JsonConstructor.hpp (1)
34-34: LGTM! Clean addition of the stream stats flag.The new boolean member follows the existing pattern and naming conventions.
components/job-orchestration/job_orchestration/executor/query/utils.py (1)
9-9: LGTM! Clean import addition.The Tuple import is correctly added to support the new return type.
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py (1)
Line range hint
160-170: LGTM! Clean handling of the new return type.The changes correctly unpack the tuple and discard the unused stdout data.
Also applies to: 171-171
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (3)
2-2: LGTM! Clean import organization.The imports are logically grouped and properly ordered.
Also applies to: 9-11
32-32: LGTM! Consistent command-line flag handling.The enable_s3_write flag is consistently used across both CLP and CLP-S commands.
Also applies to: 57-58, 78-79
131-139: LGTM! Clean S3 configuration setup.The S3 configuration is properly initialized based on the storage type.
components/core/src/clp_s/JsonConstructor.cpp (2)
8-8: LGTM!The include statement is correctly placed with other third-party includes and is necessary for the JSON output functionality.
133-139:⚠️ Potential issueSeveral issues in the JSON output implementation.
- The indent parameter
-1injson.dump()is invalid. Use a positive integer for pretty-printing or0for compact output.- The error handler is set to ignore, which could silently hide JSON encoding errors.
- As per previous feedback, the "id" field might be unnecessary.
Apply this diff to fix the issues:
if (m_option.print_ordered_stream_stats) { nlohmann::json json_msg; json_msg["stream_path"] = new_file_path.string(); - json_msg["id"] = m_option.archive_id; - std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore) + std::cout << json_msg.dump(2, ' ', true, nlohmann::json::error_handler_t::strict) << std::endl; }Likely invalid or redundant comment.
components/core/src/clp_s/CommandLineArguments.hpp (2)
73-75: LGTM!The method declaration follows the class's style and correctly uses [[nodiscard]] to prevent accidental value discarding.
198-198: LGTM!The member variable is correctly initialized and follows the class's naming convention.
components/core/src/clp/clo/CommandLineArguments.hpp (2)
51-52: LGTM!The method declaration follows the class's style and correctly uses [[nodiscard]] to prevent accidental value discarding.
185-185: LGTM!The member variable is correctly initialized and follows the class's naming convention.
components/core/src/clp_s/clp-s.cpp (1)
309-309: LGTM!The option assignment is correctly placed and follows the same style as other option assignments.
components/core/src/clp/clo/clo.cpp (1)
6-6: LGTM! The JSON library include is correctly placed.The include is appropriately positioned with other external dependencies.
components/core/src/clp/clo/CommandLineArguments.cpp (1)
182-190: LGTM! The command-line option is well-implemented.The new
--print-stream-statsoption follows the established patterns:
- Uses bool_switch for flag-style option
- Provides clear description
- Maintains consistent formatting with other options
components/core/src/clp_s/CommandLineArguments.cpp (2)
415-418: LGTM! The command-line option is well-implemented.The new
--print-ordered-stream-statsoption follows the established patterns:
- Uses bool_switch for flag-style option
- Provides clear description
- Maintains consistent formatting with other options
499-504: LGTM! The validation logic is thorough and clear.The validation ensures that
--print-ordered-stream-statsis only used with the--orderedargument, providing a clear error message on misuse.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
components/core/src/clp/clo/clo.cpp (2)
216-221: Consider enhancing the JSON output implementation.Several improvements could make the JSON output more robust and useful:
- Use a specific indentation value instead of -1 for better clarity
- Consider using a stricter error handler for UTF-8 validation
- Use relative paths to avoid exposing system information
- Include additional useful metadata (e.g., size, timestamp)
Here's a suggested implementation:
- json_msg["stream_path"] = dest_ir_path; - std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore) - << std::endl; + json_msg["stream_path"] = dest_ir_path.filename(); + json_msg["size"] = std::filesystem::file_size(dest_ir_path); + json_msg["timestamp"] = std::filesystem::last_write_time(dest_ir_path); + std::cout << json_msg.dump(2, ' ', true, nlohmann::json::error_handler_t::strict) + << std::endl;
215-222: Consider abstracting the stream output handling.The current implementation tightly couples the JSON output with filesystem operations. To better support the planned S3 integration and potential future storage backends, consider introducing an abstraction layer for stream output handling.
This would make it easier to:
- Support different storage backends (filesystem, S3, etc.)
- Standardize the JSON output format across backends
- Add new output formats without modifying the core extraction logic
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
components/core/src/clp/clo/clo.cpp(2 hunks)components/core/src/clp_s/JsonConstructor.cpp(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- components/core/src/clp_s/JsonConstructor.cpp
🧰 Additional context used
📓 Path-based instructions (1)
components/core/src/clp/clo/clo.cpp (1)
Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.
🔇 Additional comments (1)
components/core/src/clp/clo/clo.cpp (1)
6-6: LGTM! Good choice of JSON library.The nlohmann/json library is a well-tested, header-only JSON library for C++. The single-include version is used, which is good for compilation performance.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)
397-417: Add docstrings to helper functions.The helper functions
_get_directory_from_storage_configand_set_directory_for_storage_configwould benefit from docstrings describing their purpose, parameters, return values, and possible exceptions.def _get_directory_from_storage_config(storage_config: Union[FsStorage, S3Storage]) -> pathlib.Path: + """ + Get the appropriate directory path from a storage configuration. + + Args: + storage_config: The storage configuration object + + Returns: + pathlib.Path: The directory path for the storage + + Raises: + NotImplementedError: If the storage type is not supported + """ storage_type = storage_config.type def _set_directory_for_storage_config( storage_config: Union[FsStorage, S3Storage], directory ) -> None: + """ + Set the appropriate directory path in a storage configuration. + + Args: + storage_config: The storage configuration object + directory: The directory path to set + + Raises: + NotImplementedError: If the storage type is not supported + """ storage_type = storage_config.type
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
components/clp-py-utils/clp_py_utils/clp_config.py(9 hunks)components/package-template/src/etc/clp-config.yml(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (12)
- GitHub Check: ubuntu-focal-static-linked-bins
- GitHub Check: centos-stream-9-static-linked-bins
- GitHub Check: ubuntu-jammy-static-linked-bins
- GitHub Check: ubuntu-focal-dynamic-linked-bins
- GitHub Check: centos-stream-9-dynamic-linked-bins
- GitHub Check: ubuntu-jammy-dynamic-linked-bins
- GitHub Check: build-macos (macos-14, false)
- GitHub Check: build-macos (macos-14, true)
- GitHub Check: build-macos (macos-13, false)
- GitHub Check: lint-check (ubuntu-latest)
- GitHub Check: lint-check (macos-latest)
- GitHub Check: build-macos (macos-13, true)
🔇 Additional comments (4)
components/package-template/src/etc/clp-config.yml (1)
89-91: LGTM! The storage configuration structure is well-defined.The new structure for
stream_output.storagealigns with the PR objectives and maintains consistency with the existingarchive_outputconfiguration.components/clp-py-utils/clp_py_utils/clp_config.py (3)
43-43: LGTM! Well-structured base classes with proper type hints and validation.The addition of
CLP_DEFAULT_DATA_DIRECTORY_PATHconstant and updates to storage base classes provide a solid foundation for the storage configuration system.Also applies to: 344-344, 363-363
381-394: LGTM! Well-designed storage class hierarchy.The specialized storage classes (
ArchiveFsStorage,StreamFsStorage,ArchiveS3Storage,StreamS3Storage) provide clear separation of concerns and appropriate default paths.
420-420: LGTM! Configuration classes are well-structured with proper validation.The updates to
ArchiveOutputandStreamOutputclasses provide:
- Correct type hints for specialized storage classes
- Proper encapsulation of directory operations
- Consistent validation of paths
Also applies to: 451-454, 458-458, 464-464, 473-477, 481-482, 552-552, 582-582, 668-668, 677-677
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (1)
167-189: Enhance error logging for S3 upload failures.The current error message "S3 Failed" is not informative enough. Consider including more details about the failure.
- task_results.error_log_path = "S3 Failed" + task_results.error_log_path = f"S3 upload failed: {s3_error}"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py(7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: build-macos (macos-13, true)
- GitHub Check: build (macos-latest)
🔇 Additional comments (4)
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (4)
9-11: LGTM! Import changes and parameter addition are well-structured.The new imports and parameter addition properly support the S3 upload functionality.
Also applies to: 31-32
57-58: LGTM! Consistent implementation of stream stats printing.The print_stream_stats flag is handled consistently for both storage engines.
Also applies to: 78-79
131-139: LGTM! Clean S3 configuration setup.The S3 configuration initialization and enable flag handling is well-structured.
Also applies to: 146-146
193-193: LGTM! Clean task results handling.The dictionary conversion of task results maintains consistency with the rest of the codebase.
| ); | ||
| options_ir_extraction.add_options()( | ||
| "target-size", | ||
| po::value<size_t>(&m_ir_target_size)->value_name("SIZE"), |
There was a problem hiding this comment.
| po::value<size_t>(&m_ir_target_size)->value_name("SIZE"), | |
| po::value<size_t>(&m_ir_target_size) | |
| ->value_name("SIZE") | |
| ->default_value(m_ir_target_size), |
Probably want a default value?
Co-authored-by: Devin Gibson <gibber9809@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)
Line range hint
687-696: Consider adding stream output validation.The
WorkerConfigclass should validate that thestream_outputconfiguration is compatible with the package storage engine, similar to howarchive_outputis validated inCLPConfig.class WorkerConfig(BaseModel): def dump_to_primitive_dict(self): d = self.dict() d["archive_output"] = self.archive_output.dump_to_primitive_dict() + if ( + StorageType.S3 == self.stream_output.storage.type + and StorageEngine.CLP_S != self.package.storage_engine + ): + raise ValueError( + f"stream_output.storage.type = 's3' is only supported with package.storage_engine" + f" = '{StorageEngine.CLP_S}'" + ) d["stream_output"] = self.stream_output.dump_to_primitive_dict() return d
🧹 Nitpick comments (5)
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (2)
190-195: Consider using more specific exception handling for S3 operationsInstead of catching a generic Exception, consider catching specific S3-related exceptions to provide more detailed error messages and handling.
- try: - s3_put(s3_config, stream_path, stream_name) - logger.info(f"Finished uploading stream {stream_name} to S3.") - except Exception as err: - logger.error(f"Failed to upload stream {stream_name}: {err}") - upload_error = True + try: + s3_put(s3_config, stream_path, stream_name) + logger.info(f"Finished uploading stream {stream_name} to S3.") + except (S3UploadError, S3ConnectionError) as err: + logger.error(f"S3 error uploading stream {stream_name}: {err}") + upload_error = True + except Exception as err: + logger.error(f"Unexpected error uploading stream {stream_name}: {err}") + upload_error = True
199-203: Consider providing more detailed task status updatesThe current implementation only updates the task status to FAILED. Consider adding more granular status updates to help with debugging and monitoring.
if upload_error: task_results.status = QueryTaskStatus.FAILED task_results.error_log_path = str(os.getenv("WORKER_LOG_PATH")) + task_results.error_message = "Failed to upload one or more streams to S3" else: logger.info(f"Finished uploading streams.")components/clp-py-utils/clp_py_utils/clp_config.py (3)
313-327: Enhance S3 credentials validation.While the empty string validation is good, consider adding additional validation for AWS access key format:
- Access key IDs typically follow the pattern
^[A-Z0-9]{20}$- Secret access keys typically follow the pattern
^[A-Za-z0-9/+=]{40}$@validator("access_key_id") def validate_access_key_id(cls, field): if field == "": raise ValueError("access_key_id cannot be empty") + if not re.match(r"^[A-Z0-9]{20}$", field): + raise ValueError("access_key_id format is invalid") return field @validator("secret_access_key") def validate_secret_access_key(cls, field): if field == "": raise ValueError("secret_access_key cannot be empty") + if not re.match(r"^[A-Za-z0-9/+=]{40}$", field): + raise ValueError("secret_access_key format is invalid") return field
357-361: Consider adding credential rotation support.The
get_credentialsmethod could be enhanced to support credential rotation by adding a method to refresh credentials from external sources (e.g., AWS IAM roles, environment variables).
418-438: Consider adding path validation in directory management helpers.The helper functions
_get_directory_from_storage_configand_set_directory_for_storage_configshould validate that the directory paths are absolute when setting them.def _set_directory_for_storage_config( storage_config: Union[FsStorage, S3Storage], directory ) -> None: + if not directory.is_absolute(): + raise ValueError("Directory path must be absolute") storage_type = storage_config.type if StorageType.FS == storage_type: storage_config.directory = directory
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
components/clp-py-utils/clp_py_utils/clp_config.py(11 hunks)components/clp-py-utils/clp_py_utils/s3_utils.py(1 hunks)components/core/src/clp_s/JsonConstructor.cpp(2 hunks)components/core/src/clp_s/clp-s.cpp(1 hunks)components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py(7 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- components/core/src/clp_s/JsonConstructor.cpp
- components/core/src/clp_s/clp-s.cpp
⏰ Context from checks skipped due to timeout of 90000ms (12)
- GitHub Check: ubuntu-jammy-static-linked-bins
- GitHub Check: ubuntu-jammy-dynamic-linked-bins
- GitHub Check: ubuntu-focal-static-linked-bins
- GitHub Check: ubuntu-focal-dynamic-linked-bins
- GitHub Check: centos-stream-9-static-linked-bins
- GitHub Check: centos-stream-9-dynamic-linked-bins
- GitHub Check: build-macos (macos-14, false)
- GitHub Check: build-macos (macos-14, true)
- GitHub Check: lint-check (ubuntu-latest)
- GitHub Check: build-macos (macos-13, false)
- GitHub Check: build-macos (macos-13, true)
- GitHub Check: lint-check (macos-latest)
🔇 Additional comments (7)
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (2)
9-11: LGTM: Command modifications for stream statistics printingThe changes to support printing stream statistics are well-integrated into both IR and JSON extraction paths.
Also applies to: 31-32, 57-58, 78-79
131-138: LGTM: S3 configuration initializationThe S3 configuration setup is clean and follows proper validation patterns.
components/clp-py-utils/clp_py_utils/clp_config.py (3)
3-3: LGTM! Clean import addition and constant definition.The new import of
Tupleand the constantCLP_DEFAULT_DATA_DIRECTORY_PATHare well-placed and follow the project's conventions.Also applies to: 43-43
402-416: LGTM! Well-structured storage class hierarchy.The specialized storage classes (
ArchiveFsStorage,StreamFsStorage,ArchiveS3Storage,StreamS3Storage) provide good separation of concerns and default paths.
571-571: LGTM! Proper path validation in configuration.The updates to
make_config_paths_absoluteandvalidate_stream_output_dirensure proper path handling and validation.Also applies to: 601-601
components/clp-py-utils/clp_py_utils/s3_utils.py (2)
164-164: 🛠️ Refactor suggestionVerify error handling for credential retrieval
The
get_credentials()method call should be wrapped in a try-except block to handle potential credential retrieval failures gracefully.- aws_access_key_id, aws_secret_access_key = s3_config.get_credentials() + try: + aws_access_key_id, aws_secret_access_key = s3_config.get_credentials() + except Exception as e: + raise ValueError(f"Failed to retrieve AWS credentials: {str(e)}")Let's verify the error handling in the
get_credentials()method:
169-170: Consider using AWS SDK's default credential chainInstead of explicitly passing credentials, consider leveraging boto3's default credential chain which supports multiple credential sources (environment variables, IAM roles, etc.) and follows AWS security best practices.
Let's check if the codebase already uses the default credential chain elsewhere:
There was a problem hiding this comment.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)
Consider adding environment variable fallback for AWS credentials
The codebase already uses AWS environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) in other components. Adding environment variable fallback in the S3Config class would align with existing patterns and AWS best practices.
components/clp-py-utils/clp_py_utils/clp_config.py: Consider modifyingget_credentials()to check environment variables whenself.credentialsis None.🔗 Analysis chain
Line range hint
313-361: Well-structured credentials management with proper validation.The implementation properly encapsulates S3 credentials and includes appropriate validation. Consider adding a check for credential environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) as a fallback.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check if the codebase handles AWS environment variables rg -l "AWS_(?:ACCESS_KEY_ID|SECRET_ACCESS_KEY)"Length of output: 227
Script:
#!/bin/bash # Check the implementation details around AWS credentials echo "=== InputConfig.hpp ===" rg -A 5 -B 5 "AWS_(?:ACCESS_KEY_ID|SECRET_ACCESS_KEY)" "components/core/src/clp_s/InputConfig.hpp" echo -e "\n=== CommandLineArguments.cpp ===" rg -A 5 -B 5 "AWS_(?:ACCESS_KEY_ID|SECRET_ACCESS_KEY)" "components/core/src/clp_s/CommandLineArguments.cpp" echo -e "\n=== compression_task.py ===" rg -A 5 -B 5 "AWS_(?:ACCESS_KEY_ID|SECRET_ACCESS_KEY)" "components/job-orchestration/job_orchestration/executor/compress/compression_task.py"Length of output: 2814
🧹 Nitpick comments (2)
components/clp-py-utils/clp_py_utils/clp_config.py (2)
402-416: Well-organized storage class hierarchy with clear defaults.The specialized storage classes provide good separation of concerns. Consider adding docstrings to explain the purpose and usage of each storage class.
Add docstrings to explain the purpose of each storage class:
class ArchiveFsStorage(FsStorage): + """Filesystem storage configuration for archives with default path under CLP_DEFAULT_DATA_DIRECTORY_PATH.""" directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "archives"
418-438: Good abstraction of storage directory operations.The helper functions effectively reduce code duplication. Consider adding type hints for the
directoryparameter in_set_directory_for_storage_config.Add type hints for better code clarity:
-def _set_directory_for_storage_config( - storage_config: Union[FsStorage, S3Storage], directory -) -> None: +def _set_directory_for_storage_config( + storage_config: Union[FsStorage, S3Storage], directory: pathlib.Path +) -> None:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
components/clp-py-utils/clp_py_utils/clp_config.py(11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (12)
- GitHub Check: centos-stream-9-static-linked-bins
- GitHub Check: centos-stream-9-dynamic-linked-bins
- GitHub Check: ubuntu-jammy-static-linked-bins
- GitHub Check: ubuntu-jammy-dynamic-linked-bins
- GitHub Check: ubuntu-focal-static-linked-bins
- GitHub Check: ubuntu-focal-dynamic-linked-bins
- GitHub Check: build-macos (macos-14, false)
- GitHub Check: build-macos (macos-14, true)
- GitHub Check: lint-check (ubuntu-latest)
- GitHub Check: build-macos (macos-13, false)
- GitHub Check: build (macos-latest)
- GitHub Check: build-macos (macos-13, true)
🔇 Additional comments (3)
components/clp-py-utils/clp_py_utils/clp_config.py (3)
3-3: LGTM! Clean addition of imports and constants.The new imports and constant definition are well-placed and follow the existing pattern.
Also applies to: 43-43
365-365: Good removal of default values for storage paths.Requiring explicit directory paths improves configuration clarity and prevents ambiguous defaults.
Also applies to: 384-384
441-441: Clean integration of the new storage architecture.The storage configuration changes are consistently applied across all relevant classes, maintaining type safety and proper validation.
Also applies to: 472-475, 484-484, 493-497, 571-571, 601-601, 687-687, 696-696
gibber9809
left a comment
There was a problem hiding this comment.
LGTM, though I think @kirkrodrigues might also be planning to take a look.
| "-e", f"CLP_CONFIG_PATH={container_clp_config.logs_directory / container_config_filename}", | ||
| "-e", f"CLP_LOGS_DIR={container_logs_dir}", | ||
| "-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}", | ||
| "-e", f"WORKER_LOG_PATH={container_worker_log_path}", |
There was a problem hiding this comment.
- Prefix with CLP_ to avoid conflicts with other env vars?
- I think we discussed in a previous PR that we don't really need to have
WORKERin the name, but feel free to disagree in this case.
There was a problem hiding this comment.
I feel this is a special case where worker has two logs, one for clp executable and one for worker itself. I would keep worker in this case so it's clear
Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (4)
components/clp-py-utils/clp_py_utils/clp_config.py (3)
313-328: Enhance validation error messages.The validation error messages could be more descriptive to help users understand the requirements better.
- raise ValueError("access_key_id cannot be empty") + raise ValueError("S3 access_key_id cannot be empty. Please provide valid AWS credentials.") - raise ValueError("secret_access_key cannot be empty") + raise ValueError("S3 secret_access_key cannot be empty. Please provide valid AWS credentials.")
418-438: Add docstrings to helper functions.These utility functions would benefit from docstrings describing their purpose, parameters, and return values.
def _get_directory_from_storage_config(storage_config: Union[FsStorage, S3Storage]) -> pathlib.Path: + """ + Get the appropriate directory path from a storage configuration. + + Args: + storage_config: Either FsStorage or S3Storage configuration + + Returns: + pathlib.Path: The directory path for the given storage type + + Raises: + NotImplementedError: If the storage type is not supported + """
601-603: Enhance error message clarity.The error message could be more specific about what makes the directory invalid.
- raise ValueError(f"stream_output.storage's directory is invalid: {ex}") + raise ValueError(f"stream_output.storage's directory '{self.stream_output.get_directory()}' is invalid: {ex}")components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (1)
166-200: Enhance error messages for better debugging.The stream upload implementation is solid, but error messages could be more descriptive.
Consider enhancing error messages with more context:
- logger.exception(f"`{line}` cannot be decoded as JSON") + logger.exception(f"Failed to decode line as JSON. Line content: `{line}`") - logger.error(f"`path` is not a valid key in `{line}`") + logger.error(f"Missing required 'path' key in stream stats. Content: `{line}`")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
components/clp-py-utils/clp_py_utils/clp_config.py(11 hunks)components/core/src/clp/clo/CommandLineArguments.cpp(1 hunks)components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py(7 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
components/core/src/clp/clo/CommandLineArguments.cpp (1)
Pattern **/*.{cpp,hpp,java,js,jsx,ts,tsx}: - Prefer false == <expression> rather than !<expression>.
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: build-macos (macos-14, false)
- GitHub Check: build-macos (macos-13, false)
- GitHub Check: build-macos (macos-13, true)
- GitHub Check: lint-check (ubuntu-latest)
- GitHub Check: build (macos-latest)
- GitHub Check: lint-check (macos-latest)
🔇 Additional comments (7)
components/core/src/clp/clo/CommandLineArguments.cpp (2)
182-188: LGTM! Well-structured command-line option.The implementation follows Boost.Program_options conventions, and the description clearly indicates the unit (B) for the target size.
189-191: LGTM! Clear and consistent option implementation.The boolean switch follows the codebase conventions, and the description clearly indicates the output format (NDJSON).
components/clp-py-utils/clp_py_utils/clp_config.py (2)
43-43: LGTM! Well-defined constant.The constant follows the established naming convention and maintains consistency with other path constants in the file.
402-416: Consider using hyphens in directory names.Based on previous feedback, the team prefers using hyphens instead of underscores in directory names.
- directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged_archives" + directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-archives" - staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged_streams" + staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-streams"components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py (3)
2-2: LGTM! Clean imports and parameter addition.The new imports for S3 functionality and the addition of the
print_stream_statsparameter are well-organized and properly integrated.Also applies to: 9-11, 32-32
131-138: LGTM! Clean S3 configuration setup.The S3 configuration initialization and flag handling is well-structured. Good reuse of the
enable_s3_uploadflag for controlling both S3 upload and stream statistics printing.Also applies to: 145-145
202-207: Verify error log path consistency.The error log path is set from
WORKER_LOG_PATHenvironment variable, but this might not align with the logging destination set through the-foption for the container, as discussed in previous comments.Run this script to check logging configuration consistency:
✅ Verification successful
Logging configuration is properly aligned
The error log path using
WORKER_LOG_PATHis correctly configured and aligns with the container setup. BothWORKER_LOG_PATHandCLP_LOGS_DIRenvironment variables are properly set and mounted in the worker containers, serving different logging purposes.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check logging configuration consistency # Look for logging configuration in start_clp.py and container setup rg -A 5 "WORKER_LOG_PATH|CLP_LOGS_DIR" --type pyLength of output: 7414
…y-scope#662) Co-authored-by: Devin Gibson <gibber9809@users.noreply.github.com> Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com>
Description
This PR adds support for uploading JSON chunks and IR streams to S3.
The proposed flow is as follow:
A limitation of the current implementation:
The PR includes the following changes:
Validation performed
Trigger a stream extraction job from webui. verified that streams are properly written into S3.
Also validated different configuration input and confirmed that type of storage and value of directory are expected:
Case:
Summary by CodeRabbit
Release Notes
New Features
Improvements
Technical Enhancements