feat(job-orchestration): Add abstract interface for compression job submission and result retrieval:#1354
Conversation
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
…to spider-job-executor
…to spider-job-executor
WalkthroughIntroduces a TaskManager abstraction for compression task orchestration, adds Celery and Spider implementations, updates scheduler flow to submit and poll via a ResultHandle, replaces previous Celery-specific handling, and relocates CompressionTaskResult to a new module. Adjusts data models to store a TaskManager.ResultHandle and updates imports accordingly. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Operator as Operator
participant Scheduler as CompressionScheduler
participant TM as TaskManager
participant RH as ResultHandle
participant Backend as Execution Backend
participant Workers as Workers
Operator->>Scheduler: main()/loop
Scheduler->>TM: submit(task_params)
activate TM
TM->>Backend: schedule batch
Backend->>Workers: dispatch tasks
TM-->>Scheduler: ResultHandle
deactivate TM
loop Poll until done/timeout
Scheduler->>RH: get_result(timeout)
alt results available
RH-->>Scheduler: list<CompressionTaskResult>
Scheduler->>Scheduler: update job states
else timeout/no results
RH-->>Scheduler: None
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| def convert_from_str(string: str) -> list[spider_py.Int8]: | ||
| """ | ||
| Convert a string to a list of `Int8` as utf-8 bytes. | ||
| :param string: The string to convert. | ||
| :return: The list of `Int8` representing the string. | ||
| """ | ||
| return [spider_py.Int8(byte) for byte in string.encode("utf-8")] |
There was a problem hiding this comment.
I will directly push in the next commit but please check:
- This is an internal function so it should start with
_. Convert->Converts.Int8->spider_py.Int8.Int8is not a valid symbol in this context.The list of->A list of. Normally, we don't useThein a return statement as it's not referring to a particular object.
Before marking this PR ready for review, make sure u go through other docstrings and fix obvious problems.
| job = CompressionJob( | ||
| id=job_id, start_time=start_time, async_task_result=tasks_group.apply_async() | ||
| ) | ||
| job = CompressionJob(id=job_id, start_time=start_time, async_task_result=result_handle) |
There was a problem hiding this comment.
I'm not sure why CompressionJob still need async_task_result. Can you check?
LinZhihao-723
left a comment
There was a problem hiding this comment.
I don't have time to check everything. Please ensure the updated code can be executed correctly.
| results: list[CompressionTaskResult] = [] | ||
| for job in self._spider_jobs: | ||
| job_results = job.get_results() | ||
| if job_results is None: | ||
| return None | ||
| results.extend( | ||
| [ | ||
| CompressionTaskResult.validate_model_json(convert_to_str(result)) | ||
| for result in job_results | ||
| ] | ||
| ) | ||
| return results |
There was a problem hiding this comment.
Please check whether this makes sense. I think what u implemented were wrong: the returned object is a sequence and it can't be called with get_results.
| class Scheduler(ABC): | ||
| """Abstract base class for a scheduler framework.""" | ||
|
|
||
| class ResultHandle(ABC): | ||
| @abstractmethod | ||
| def get_result(self, timeout: float = 0.1) -> list[CompressionTaskResult] | None: | ||
| """ | ||
| Gets the result of a compression job. | ||
| :param timeout: The maximum time to wait for the result. Notice that some schedulers | ||
| ignore this parameter. | ||
| :return: A list of task results. | ||
| """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def compress(self, task_params: list[dict[str, Any]]) -> ResultHandle: | ||
| """ | ||
| Starts a batch of compression tasks as a job. | ||
| :param task_params: A list of dictionaries containing parameters for each compression task. | ||
| :return: A handle through which to get the result of the job. | ||
| """ | ||
| pass |
There was a problem hiding this comment.
Add one more layer of abstraction: the result handle now is also an abstract class, and get_result would be a method of ResultHandle.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (1)
37-45: Pass the metadata DB config string, not a decoded IO config object
json.loads(task_param["clp_io_config_json"])returns a dict, soutf8_str_to_int8_list(...)immediately blows up because it calls.encode()on that dict. The Spider bridge also expects the sixth argument to beclp_metadata_db_connection_config_json, not the IO config. Please feed the metadata DB config string directly.- utf8_str_to_int8_list(json.loads(task_param["clp_io_config_json"])), + utf8_str_to_int8_list(task_param["clp_metadata_db_connection_config_json"]),
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py(1 hunks)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (4)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (4)
compress(24-30)TaskManager(9-30)ResultHandle(12-21)get_result(14-21)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
compress(24-27)ResultHandle(13-22)get_result(17-22)components/job-orchestration/job_orchestration/scheduler/scheduler_result.py (1)
CompressionTaskResult(7-18)components/job-orchestration/job_orchestration/utils/spider_utils.py (1)
utf8_str_to_int8_list(16-23)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (3)
components/job-orchestration/job_orchestration/scheduler/scheduler_result.py (1)
CompressionTaskResult(7-18)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (3)
ResultHandle(17-28)get_result(21-28)compress(33-49)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
ResultHandle(13-22)get_result(17-22)compress(24-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-22.04)
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py(1 hunks)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py(1 hunks)
👮 Files not reviewed due to content moderation or server errors (2)
- components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py
- components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py
🧰 Additional context used
🧬 Code graph analysis (2)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (4)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (4)
compress(24-30)TaskManager(9-30)ResultHandle(12-21)get_result(14-21)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
compress(24-27)ResultHandle(13-22)get_result(17-22)components/job-orchestration/job_orchestration/scheduler/scheduler_result.py (1)
CompressionTaskResult(7-18)components/job-orchestration/job_orchestration/utils/spider_utils.py (1)
utf8_str_to_int8_list(16-23)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (3)
components/job-orchestration/job_orchestration/scheduler/scheduler_result.py (1)
CompressionTaskResult(7-18)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (3)
ResultHandle(17-28)get_result(21-28)compress(33-49)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
ResultHandle(13-22)get_result(17-22)compress(24-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: build (macos-15)
🔇 Additional comments (7)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (7)
1-15: LGTM!The imports are appropriate, and the class correctly inherits from
TaskManager.
30-31: LGTM!The constructor correctly initializes the Spider driver with the provided storage URL.
37-47: LGTM! The metadata DB config parameter is now correctly handled.Line 44 now correctly uses
json.dumps(task_param["clp_metadata_db_connection_config"]), addressing the critical issue flagged in previous reviews. The parameter is properly serialized and passed to the compression task.
48-49: Confirm SpiderDriver.submit_jobs parameter format
itertools.chain(*task_params_list) flattens nested lists; verify SpiderDriver.submit_jobs expects a flat list of parameters versus a list of parameter lists by reviewing its API docs or implementation.
42-44: LGTM! The parameter passing issue has been resolved.The 6th parameter now correctly serialises
clp_metadata_db_connection_configto JSON before converting to an Int8 list, addressing the critical bug flagged in previous reviews.
48-49: Verify submit_jobs parameter flattening
EnsureDriver.submit_jobsexpects a single flattened parameter list (usingitertools.chain) rather than separate per‐job parameter lists. Confirm the method signature inspider_py.Driverand adjust to passtask_params_listdirectly if it should receive one list per job.
33-49: Verify submit_jobs parameter structure
The code flattens all task parameters into one list (line 48), which may lose per-task boundaries. Confirm whetherDriver.submit_jobsexpects a single flattened list or a list of parameter lists (one per task) and adjust the call accordingly.
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
LinZhihao-723
left a comment
There was a problem hiding this comment.
I will do another round of testing after these fixes.
| pass | ||
|
|
||
| @abstractmethod | ||
| def compress(self, task_params: list[dict[str, Any]]) -> ResultHandle: |
There was a problem hiding this comment.
Looking at it after restructuring... I think it makes more sense to name it submit.
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (3)
components/job-orchestration/job_orchestration/scheduler/task_result.py (1)
7-20: Consider using the enum type for stronger type safety.The
statusfield is typed asint(Line 9), which allows any integer at the type-checking level. Usingstatus: CompressionTaskStatuswould provide stronger type hints and catch invalid values during static analysis, not just at runtime via the validator.Apply this diff to use the enum type:
class CompressionTaskResult(BaseModel): task_id: int - status: int + status: CompressionTaskStatus duration: float error_message: Optional[str] = Nonecomponents/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (1)
9-30: Fix docstring indentation.The multi-line docstring parameter description on Lines 17-18 has incorrect indentation. The continuation line should align with the text on the previous line.
Apply this diff to fix the indentation:
def get_result(self, timeout: float = 0.1) -> list[CompressionTaskResult] | None: """ Gets the result of a compression job. - :param timeout: Maximum time (in seconds) to wait for retrieving the result. Depending - on the implementation, this parameter may be ignored. + :param timeout: Maximum time (in seconds) to wait for retrieving the result. + Depending on the implementation, this parameter may be ignored. :return: A list of task results. """ passcomponents/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (1)
17-22: Consider catching ValidationError for clearer error reporting.If a Celery task returns malformed results,
CompressionTaskResult.model_validate(res)on Line 20 will raiseValidationError, which would propagate to the caller incompression_scheduler.pywith a generic error message. Consider catchingValidationErrorexplicitly and returningNoneor raising a more descriptive exception.Apply this diff to handle validation errors gracefully:
+from pydantic import ValidationError + class CeleryTaskManager(TaskManager): class ResultHandle(TaskManager.ResultHandle): def __init__(self, celery_result: celery.result.GroupResult) -> None: self._celery_result: celery.result.GroupResult = celery_result def get_result(self, timeout: float = 0.1) -> list[CompressionTaskResult] | None: try: results = self._celery_result.get(timeout=timeout) return [CompressionTaskResult.model_validate(res) for res in results] except celery.exceptions.TimeoutError: return None + except ValidationError as e: + raise RuntimeError(f"Failed to parse task results: {e}") from e
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (6)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py(1 hunks)components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py(6 hunks)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py(1 hunks)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py(1 hunks)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py(1 hunks)components/job-orchestration/job_orchestration/scheduler/task_result.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (6)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)
components/job-orchestration/job_orchestration/scheduler/task_result.py (1)
CompressionTaskResult(7-20)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (4)
components/job-orchestration/job_orchestration/executor/compress/celery_compress.py (1)
compress(17-34)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
CeleryTaskManager(11-27)submit(24-27)get_result(17-22)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (3)
TaskManager(9-30)submit(24-30)get_result(14-21)components/job-orchestration/job_orchestration/scheduler/scheduler_data.py (1)
CompressionJob(22-28)
components/job-orchestration/job_orchestration/scheduler/task_result.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
CompressionTaskStatus(40-45)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (4)
TaskManager(9-30)ResultHandle(12-21)get_result(14-21)submit(24-30)components/job-orchestration/job_orchestration/scheduler/task_result.py (1)
CompressionTaskResult(7-20)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (3)
ResultHandle(16-27)get_result(20-27)submit(32-47)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (3)
components/job-orchestration/job_orchestration/scheduler/task_result.py (1)
CompressionTaskResult(7-20)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
ResultHandle(13-22)get_result(17-22)submit(24-27)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (3)
ResultHandle(16-27)get_result(20-27)submit(32-47)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (4)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (4)
TaskManager(9-30)ResultHandle(12-21)get_result(14-21)submit(24-30)components/job-orchestration/job_orchestration/scheduler/task_result.py (1)
CompressionTaskResult(7-20)components/job-orchestration/job_orchestration/utils/spider_utils.py (2)
int8_list_to_utf8_str(6-13)utf8_str_to_int8_list(16-23)components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
ResultHandle(13-22)get_result(17-22)submit(24-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (5)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)
39-39: LGTM! Import path correctly updated.The import path for
CompressionTaskResulthas been properly updated to reflect its new location in thetask_resultmodule, aligning with the PR's refactoring to centralize task result types.components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (4)
33-34: LGTM! TaskManager abstraction properly integrated.The import changes correctly introduce the
TaskManagerabstraction andCeleryTaskManagerimplementation, replacing direct Celery dependencies.
214-228: LGTM! Function signature properly updated.The function signature correctly adds the
task_manager: TaskManagerparameter, enabling dependency injection of the scheduling implementation.
372-374: LGTM! Task submission refactored to use TaskManager.The code correctly replaces direct Celery group submission with the
TaskManager.submit()abstraction, andCompressionJobnow properly holds aresult_handleinstead of a Celery-specific async result.
400-400: LGTM! Result retrieval properly abstracted.The code correctly uses
job.result_handle.get_result()instead of directly accessing Celery result objects, maintaining consistency with theTaskManagerabstraction.
| task_manager = CeleryTaskManager() | ||
|
|
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider making TaskManager selection configurable.
The TaskManager implementation is currently hardcoded to CeleryTaskManager (Line 489). While this aligns with the PR objectives for compression scheduling, making this configurable via the CLP configuration file would support future scheduler implementations (e.g., Spider) without code changes.
Example approach:
scheduler_type = clp_config.compression_scheduler.get("scheduler_type", "celery")
if scheduler_type == "celery":
task_manager = CeleryTaskManager()
elif scheduler_type == "spider":
storage_url = clp_config.compression_scheduler.get("storage_url")
task_manager = SpiderTaskManager(storage_url)
else:
logger.error(f"Unsupported scheduler type: {scheduler_type}")
return -1| def submit(self, task_params: list[dict[str, Any]]) -> TaskManager.ResultHandle: | ||
| job = spider_py.group( | ||
| [compress for _ in range(len(task_params))], | ||
| ) | ||
| job_args = [] | ||
| for task_param in task_params: | ||
| job_args.append(spider_py.Int64(task_param["job_id"])) | ||
| job_args.append(spider_py.Int64(task_param["task_id"])) | ||
| job_args.append([spider_py.Int64(tag_id) for tag_id in task_param["tag_ids"]]) | ||
| job_args.append(utf8_str_to_int8_list(task_param["clp_io_config_json"])) | ||
| job_args.append(utf8_str_to_int8_list(task_param["paths_to_compress_json"])) | ||
| job_args.append( | ||
| utf8_str_to_int8_list(json.dumps(task_param["clp_metadata_db_connection_config"])) | ||
| ) | ||
| submitted_job = self._driver.submit_jobs([job], [job_args])[0] | ||
| return SpiderTaskManager.ResultHandle(submitted_job) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider structuring job_args more clearly.
The current implementation appends all task parameters to a single flat job_args list (Lines 36-45). While functional, organizing this as a list of per-task argument lists would improve readability and make the parameter structure more explicit.
Consider this alternative structure:
def submit(self, task_params: list[dict[str, Any]]) -> TaskManager.ResultHandle:
job = spider_py.group(
[compress for _ in range(len(task_params))],
)
- job_args = []
- for task_param in task_params:
- job_args.append(spider_py.Int64(task_param["job_id"]))
- job_args.append(spider_py.Int64(task_param["task_id"]))
- job_args.append([spider_py.Int64(tag_id) for tag_id in task_param["tag_ids"]])
- job_args.append(utf8_str_to_int8_list(task_param["clp_io_config_json"]))
- job_args.append(utf8_str_to_int8_list(task_param["paths_to_compress_json"]))
- job_args.append(
- utf8_str_to_int8_list(json.dumps(task_param["clp_metadata_db_connection_config"]))
- )
+ job_args = [
+ [
+ spider_py.Int64(task_param["job_id"]),
+ spider_py.Int64(task_param["task_id"]),
+ [spider_py.Int64(tag_id) for tag_id in task_param["tag_ids"]],
+ utf8_str_to_int8_list(task_param["clp_io_config_json"]),
+ utf8_str_to_int8_list(task_param["paths_to_compress_json"]),
+ utf8_str_to_int8_list(json.dumps(task_param["clp_metadata_db_connection_config"])),
+ ]
+ for task_param in task_params
+ ]
submitted_job = self._driver.submit_jobs([job], [job_args])[0]
return SpiderTaskManager.ResultHandle(submitted_job)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def submit(self, task_params: list[dict[str, Any]]) -> TaskManager.ResultHandle: | |
| job = spider_py.group( | |
| [compress for _ in range(len(task_params))], | |
| ) | |
| job_args = [] | |
| for task_param in task_params: | |
| job_args.append(spider_py.Int64(task_param["job_id"])) | |
| job_args.append(spider_py.Int64(task_param["task_id"])) | |
| job_args.append([spider_py.Int64(tag_id) for tag_id in task_param["tag_ids"]]) | |
| job_args.append(utf8_str_to_int8_list(task_param["clp_io_config_json"])) | |
| job_args.append(utf8_str_to_int8_list(task_param["paths_to_compress_json"])) | |
| job_args.append( | |
| utf8_str_to_int8_list(json.dumps(task_param["clp_metadata_db_connection_config"])) | |
| ) | |
| submitted_job = self._driver.submit_jobs([job], [job_args])[0] | |
| return SpiderTaskManager.ResultHandle(submitted_job) | |
| def submit(self, task_params: list[dict[str, Any]]) -> TaskManager.ResultHandle: | |
| job = spider_py.group( | |
| [compress for _ in range(len(task_params))], | |
| ) | |
| job_args = [ | |
| [ | |
| spider_py.Int64(task_param["job_id"]), | |
| spider_py.Int64(task_param["task_id"]), | |
| [spider_py.Int64(tag_id) for tag_id in task_param["tag_ids"]], | |
| utf8_str_to_int8_list(task_param["clp_io_config_json"]), | |
| utf8_str_to_int8_list(task_param["paths_to_compress_json"]), | |
| utf8_str_to_int8_list( | |
| json.dumps(task_param["clp_metadata_db_connection_config"]) | |
| ), | |
| ] | |
| for task_param in task_params | |
| ] | |
| submitted_job = self._driver.submit_jobs([job], [job_args])[0] | |
| return SpiderTaskManager.ResultHandle(submitted_job) |
🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py
around lines 32 to 47, the code builds a single flat job_args list by appending
all task parameters in sequence; instead, create job_args as a list of per-task
argument lists (for each task_param build a local args list containing the Int64
job_id, Int64 task_id, list of Int64 tag_ids, utf8_str_to_int8_list
clp_io_config_json, utf8_str_to_int8_list paths_to_compress_json, and
utf8_str_to_int8_list json.dumps(clp_metadata_db_connection_config), then append
that args list to job_args), and pass that per-task job_args into
self._driver.submit_jobs([job], [job_args]) so each task’s parameters are
clearly separated.
LinZhihao-723
left a comment
There was a problem hiding this comment.
Directly modified the PR title (this will be a multi-line commit message).
…ubmission and result retrieval: (y-scope#1354) * Refactor Celery-based compression job management to use the new interface. * Implement Spider-based compression job management. Co-authored-by: LinZhihao-723 <zh.lin@mail.utoronto.ca>
Description
Add scheduler interface in
job-orchestration's scheduler. Add theCelerySchedulerandSpiderSchedulerimplementation and the use ofCelerySchedulerin compression scheduler.Checklist
breaking change.
Validation performed
job-orchestrationcompression succeeds end-to-end using Celery.Summary by CodeRabbit
New Features
Refactor