Skip to content

feat(job-orchestration): Add abstract interface for compression job submission and result retrieval:#1354

Merged
LinZhihao-723 merged 56 commits into
y-scope:mainfrom
sitaowang1998:spider-job-scheduler
Oct 2, 2025
Merged

feat(job-orchestration): Add abstract interface for compression job submission and result retrieval:#1354
LinZhihao-723 merged 56 commits into
y-scope:mainfrom
sitaowang1998:spider-job-scheduler

Conversation

@sitaowang1998

@sitaowang1998 sitaowang1998 commented Sep 30, 2025

Copy link
Copy Markdown
Contributor
  • Refactor Celery-based compression job management to use the new interface.
  • Implement Spider-based compression job management.

Description

Add scheduler interface in job-orchestration's scheduler. Add the CeleryScheduler and SpiderScheduler implementation and the use of CeleryScheduler in compression scheduler.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • job-orchestration compression succeeds end-to-end using Celery.
  • GitHub workflows pass.

Summary by CodeRabbit

  • New Features

    • Introduced a pluggable task execution system for compression jobs, enabling multiple backends.
    • Added Spider- and Celery-based executors for improved flexibility and scalability.
    • Enhanced result and status reporting with clearer error messages.
  • Refactor

    • Reworked the compression scheduler to use a unified task manager abstraction.
    • Standardized how running jobs are tracked and polled via a consistent result handle.
    • Consolidated task result modelling to improve validation and reliability.

@coderabbitai

coderabbitai Bot commented Sep 30, 2025

Copy link
Copy Markdown
Contributor

Walkthrough

Introduces a TaskManager abstraction for compression task orchestration, adds Celery and Spider implementations, updates scheduler flow to submit and poll via a ResultHandle, replaces previous Celery-specific handling, and relocates CompressionTaskResult to a new module. Adjusts data models to store a TaskManager.ResultHandle and updates imports accordingly.

Changes

Cohort / File(s) Summary of changes
Executor import update
components/job-orchestration/job_orchestration/executor/compress/compression_task.py
Updated import of CompressionTaskResult to job_orchestration.scheduler.task_result. No logic changes.
Scheduler orchestration refactor
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Replaced direct Celery orchestration with TaskManager. Functions accept/use a task_manager, schedule via submit(...), and poll via result_handle.get_result(). Adjusted imports and control flow.
TaskManager abstraction
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py
Added abstract TaskManager with nested ResultHandle and an API to submit batches and fetch results with optional timeout.
Celery implementation
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py
Added CeleryTaskManager implementing TaskManager. Wraps Celery groups, exposes ResultHandle.get_result(timeout) returning parsed CompressionTaskResult or None on timeout.
Spider implementation
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py
Added SpiderTaskManager implementing TaskManager. Submits spider jobs, converts args for transport, and exposes ResultHandle.get_result(timeout) producing CompressionTaskResult or None.
Scheduler data model changes
components/job-orchestration/job_orchestration/scheduler/scheduler_data.py
CompressionJob now holds result_handle: TaskManager.ResultHandle and allows arbitrary types via model_config. Removed CompressionTaskResult model and related imports.
Task result model relocation
components/job-orchestration/job_orchestration/scheduler/task_result.py
Added CompressionTaskResult Pydantic model with status validation against CompressionTaskStatus.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Operator as Operator
  participant Scheduler as CompressionScheduler
  participant TM as TaskManager
  participant RH as ResultHandle
  participant Backend as Execution Backend
  participant Workers as Workers

  Operator->>Scheduler: main()/loop
  Scheduler->>TM: submit(task_params)
  activate TM
  TM->>Backend: schedule batch
  Backend->>Workers: dispatch tasks
  TM-->>Scheduler: ResultHandle
  deactivate TM
  loop Poll until done/timeout
    Scheduler->>RH: get_result(timeout)
    alt results available
      RH-->>Scheduler: list<CompressionTaskResult>
      Scheduler->>Scheduler: update job states
    else timeout/no results
      RH-->>Scheduler: None
    end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 28.57% which is insufficient. The required threshold is 80.00%. You can run `@coderabbitai generate docstrings` to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit's high-level summary is enabled.
Title Check ✅ Passed The title clearly and concisely states the introduction of the abstract interface for compression job submission and result retrieval, which is the core change of the pull request, without including extraneous details.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sitaowang1998 sitaowang1998 changed the title feat(job-orchestration: Add scheduler interface ; Add Spider scheduler. feat(job-orchestration): Add scheduler interface ; Add Spider scheduler. Sep 30, 2025
Comment on lines +12 to +18
def convert_from_str(string: str) -> list[spider_py.Int8]:
"""
Convert a string to a list of `Int8` as utf-8 bytes.
:param string: The string to convert.
:return: The list of `Int8` representing the string.
"""
return [spider_py.Int8(byte) for byte in string.encode("utf-8")]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will directly push in the next commit but please check:

  • This is an internal function so it should start with _.
  • Convert -> Converts.
  • Int8 -> spider_py.Int8. Int8 is not a valid symbol in this context.
  • The list of -> A list of. Normally, we don't use The in a return statement as it's not referring to a particular object.

Before marking this PR ready for review, make sure u go through other docstrings and fix obvious problems.

job = CompressionJob(
id=job_id, start_time=start_time, async_task_result=tasks_group.apply_async()
)
job = CompressionJob(id=job_id, start_time=start_time, async_task_result=result_handle)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why CompressionJob still need async_task_result. Can you check?

@LinZhihao-723 LinZhihao-723 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have time to check everything. Please ensure the updated code can be executed correctly.

Comment on lines +20 to +31
results: list[CompressionTaskResult] = []
for job in self._spider_jobs:
job_results = job.get_results()
if job_results is None:
return None
results.extend(
[
CompressionTaskResult.validate_model_json(convert_to_str(result))
for result in job_results
]
)
return results

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check whether this makes sense. I think what u implemented were wrong: the returned object is a sequence and it can't be called with get_results.

Comment on lines +9 to +30
class Scheduler(ABC):
"""Abstract base class for a scheduler framework."""

class ResultHandle(ABC):
@abstractmethod
def get_result(self, timeout: float = 0.1) -> list[CompressionTaskResult] | None:
"""
Gets the result of a compression job.
:param timeout: The maximum time to wait for the result. Notice that some schedulers
ignore this parameter.
:return: A list of task results.
"""
pass

@abstractmethod
def compress(self, task_params: list[dict[str, Any]]) -> ResultHandle:
"""
Starts a batch of compression tasks as a job.
:param task_params: A list of dictionaries containing parameters for each compression task.
:return: A handle through which to get the result of the job.
"""
pass

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add one more layer of abstraction: the result handle now is also an abstract class, and get_result would be a method of ResultHandle.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (1)

37-45: Pass the metadata DB config string, not a decoded IO config object

json.loads(task_param["clp_io_config_json"]) returns a dict, so utf8_str_to_int8_list(...) immediately blows up because it calls .encode() on that dict. The Spider bridge also expects the sixth argument to be clp_metadata_db_connection_config_json, not the IO config. Please feed the metadata DB config string directly.

-                utf8_str_to_int8_list(json.loads(task_param["clp_io_config_json"])),
+                utf8_str_to_int8_list(task_param["clp_metadata_db_connection_config_json"]),
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d5edb10 and 3211d8a.

📒 Files selected for processing (2)
  • components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (1 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (4)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (4)
  • compress (24-30)
  • TaskManager (9-30)
  • ResultHandle (12-21)
  • get_result (14-21)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
  • compress (24-27)
  • ResultHandle (13-22)
  • get_result (17-22)
components/job-orchestration/job_orchestration/scheduler/scheduler_result.py (1)
  • CompressionTaskResult (7-18)
components/job-orchestration/job_orchestration/utils/spider_utils.py (1)
  • utf8_str_to_int8_list (16-23)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (3)
components/job-orchestration/job_orchestration/scheduler/scheduler_result.py (1)
  • CompressionTaskResult (7-18)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (3)
  • ResultHandle (17-28)
  • get_result (21-28)
  • compress (33-49)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
  • ResultHandle (13-22)
  • get_result (17-22)
  • compress (24-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: rust-checks (ubuntu-24.04)
  • GitHub Check: rust-checks (ubuntu-22.04)

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3211d8a and 7184d86.

📒 Files selected for processing (2)
  • components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (1 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (1 hunks)
👮 Files not reviewed due to content moderation or server errors (2)
  • components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py
  • components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py
🧰 Additional context used
🧬 Code graph analysis (2)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (4)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (4)
  • compress (24-30)
  • TaskManager (9-30)
  • ResultHandle (12-21)
  • get_result (14-21)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
  • compress (24-27)
  • ResultHandle (13-22)
  • get_result (17-22)
components/job-orchestration/job_orchestration/scheduler/scheduler_result.py (1)
  • CompressionTaskResult (7-18)
components/job-orchestration/job_orchestration/utils/spider_utils.py (1)
  • utf8_str_to_int8_list (16-23)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (3)
components/job-orchestration/job_orchestration/scheduler/scheduler_result.py (1)
  • CompressionTaskResult (7-18)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (3)
  • ResultHandle (17-28)
  • get_result (21-28)
  • compress (33-49)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
  • ResultHandle (13-22)
  • get_result (17-22)
  • compress (24-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: rust-checks (ubuntu-22.04)
  • GitHub Check: rust-checks (ubuntu-24.04)
  • GitHub Check: build (macos-15)
🔇 Additional comments (7)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (7)

1-15: LGTM!

The imports are appropriate, and the class correctly inherits from TaskManager.


30-31: LGTM!

The constructor correctly initializes the Spider driver with the provided storage URL.


37-47: LGTM! The metadata DB config parameter is now correctly handled.

Line 44 now correctly uses json.dumps(task_param["clp_metadata_db_connection_config"]), addressing the critical issue flagged in previous reviews. The parameter is properly serialized and passed to the compression task.


48-49: Confirm SpiderDriver.submit_jobs parameter format
itertools.chain(*task_params_list) flattens nested lists; verify SpiderDriver.submit_jobs expects a flat list of parameters versus a list of parameter lists by reviewing its API docs or implementation.


42-44: LGTM! The parameter passing issue has been resolved.

The 6th parameter now correctly serialises clp_metadata_db_connection_config to JSON before converting to an Int8 list, addressing the critical bug flagged in previous reviews.


48-49: Verify submit_jobs parameter flattening
Ensure Driver.submit_jobs expects a single flattened parameter list (using itertools.chain) rather than separate per‐job parameter lists. Confirm the method signature in spider_py.Driver and adjust to pass task_params_list directly if it should receive one list per job.


33-49: Verify submit_jobs parameter structure
The code flattens all task parameters into one list (line 48), which may lose per-task boundaries. Confirm whether Driver.submit_jobs expects a single flattened list or a list of parameter lists (one per task) and adjust the call accordingly.

@sitaowang1998

Copy link
Copy Markdown
Contributor Author

@CodeRabbit review

@coderabbitai

coderabbitai Bot commented Oct 2, 2025

Copy link
Copy Markdown
Contributor
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@sitaowang1998

Copy link
Copy Markdown
Contributor Author

@CodeRabbit review

@coderabbitai

coderabbitai Bot commented Oct 2, 2025

Copy link
Copy Markdown
Contributor
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@LinZhihao-723 LinZhihao-723 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do another round of testing after these fixes.

Comment thread components/job-orchestration/job_orchestration/scheduler/scheduler_result.py Outdated
Comment thread components/job-orchestration/job_orchestration/scheduler/scheduler_data.py Outdated
pass

@abstractmethod
def compress(self, task_params: list[dict[str, Any]]) -> ResultHandle:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at it after restructuring... I think it makes more sense to name it submit.

sitaowang1998 and others added 5 commits October 2, 2025 15:21

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
components/job-orchestration/job_orchestration/scheduler/task_result.py (1)

7-20: Consider using the enum type for stronger type safety.

The status field is typed as int (Line 9), which allows any integer at the type-checking level. Using status: CompressionTaskStatus would provide stronger type hints and catch invalid values during static analysis, not just at runtime via the validator.

Apply this diff to use the enum type:

 class CompressionTaskResult(BaseModel):
     task_id: int
-    status: int
+    status: CompressionTaskStatus
     duration: float
     error_message: Optional[str] = None
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (1)

9-30: Fix docstring indentation.

The multi-line docstring parameter description on Lines 17-18 has incorrect indentation. The continuation line should align with the text on the previous line.

Apply this diff to fix the indentation:

         def get_result(self, timeout: float = 0.1) -> list[CompressionTaskResult] | None:
             """
             Gets the result of a compression job.
-            :param timeout: Maximum time (in seconds) to wait for retrieving the result. Depending
-                on the implementation, this parameter may be ignored.
+            :param timeout: Maximum time (in seconds) to wait for retrieving the result.
+                Depending on the implementation, this parameter may be ignored.
             :return: A list of task results.
             """
             pass
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (1)

17-22: Consider catching ValidationError for clearer error reporting.

If a Celery task returns malformed results, CompressionTaskResult.model_validate(res) on Line 20 will raise ValidationError, which would propagate to the caller in compression_scheduler.py with a generic error message. Consider catching ValidationError explicitly and returning None or raising a more descriptive exception.

Apply this diff to handle validation errors gracefully:

+from pydantic import ValidationError
+
 class CeleryTaskManager(TaskManager):
 
     class ResultHandle(TaskManager.ResultHandle):
         def __init__(self, celery_result: celery.result.GroupResult) -> None:
             self._celery_result: celery.result.GroupResult = celery_result
 
         def get_result(self, timeout: float = 0.1) -> list[CompressionTaskResult] | None:
             try:
                 results = self._celery_result.get(timeout=timeout)
                 return [CompressionTaskResult.model_validate(res) for res in results]
             except celery.exceptions.TimeoutError:
                 return None
+            except ValidationError as e:
+                raise RuntimeError(f"Failed to parse task results: {e}") from e
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ce39db1 and 2f9ac95.

📒 Files selected for processing (6)
  • components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (6 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (1 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (1 hunks)
  • components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (1 hunks)
  • components/job-orchestration/job_orchestration/scheduler/task_result.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (6)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)
components/job-orchestration/job_orchestration/scheduler/task_result.py (1)
  • CompressionTaskResult (7-20)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (4)
components/job-orchestration/job_orchestration/executor/compress/celery_compress.py (1)
  • compress (17-34)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
  • CeleryTaskManager (11-27)
  • submit (24-27)
  • get_result (17-22)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (3)
  • TaskManager (9-30)
  • submit (24-30)
  • get_result (14-21)
components/job-orchestration/job_orchestration/scheduler/scheduler_data.py (1)
  • CompressionJob (22-28)
components/job-orchestration/job_orchestration/scheduler/task_result.py (1)
components/job-orchestration/job_orchestration/scheduler/constants.py (1)
  • CompressionTaskStatus (40-45)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (4)
  • TaskManager (9-30)
  • ResultHandle (12-21)
  • get_result (14-21)
  • submit (24-30)
components/job-orchestration/job_orchestration/scheduler/task_result.py (1)
  • CompressionTaskResult (7-20)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (3)
  • ResultHandle (16-27)
  • get_result (20-27)
  • submit (32-47)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (3)
components/job-orchestration/job_orchestration/scheduler/task_result.py (1)
  • CompressionTaskResult (7-20)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
  • ResultHandle (13-22)
  • get_result (17-22)
  • submit (24-27)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (3)
  • ResultHandle (16-27)
  • get_result (20-27)
  • submit (32-47)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py (4)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/task_manager.py (4)
  • TaskManager (9-30)
  • ResultHandle (12-21)
  • get_result (14-21)
  • submit (24-30)
components/job-orchestration/job_orchestration/scheduler/task_result.py (1)
  • CompressionTaskResult (7-20)
components/job-orchestration/job_orchestration/utils/spider_utils.py (2)
  • int8_list_to_utf8_str (6-13)
  • utf8_str_to_int8_list (16-23)
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/celery_task_manager.py (3)
  • ResultHandle (13-22)
  • get_result (17-22)
  • submit (24-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: rust-checks (ubuntu-24.04)
  • GitHub Check: rust-checks (ubuntu-22.04)
  • GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (5)
components/job-orchestration/job_orchestration/executor/compress/compression_task.py (1)

39-39: LGTM! Import path correctly updated.

The import path for CompressionTaskResult has been properly updated to reflect its new location in the task_result module, aligning with the PR's refactoring to centralize task result types.

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (4)

33-34: LGTM! TaskManager abstraction properly integrated.

The import changes correctly introduce the TaskManager abstraction and CeleryTaskManager implementation, replacing direct Celery dependencies.


214-228: LGTM! Function signature properly updated.

The function signature correctly adds the task_manager: TaskManager parameter, enabling dependency injection of the scheduling implementation.


372-374: LGTM! Task submission refactored to use TaskManager.

The code correctly replaces direct Celery group submission with the TaskManager.submit() abstraction, and CompressionJob now properly holds a result_handle instead of a Celery-specific async result.


400-400: LGTM! Result retrieval properly abstracted.

The code correctly uses job.result_handle.get_result() instead of directly accessing Celery result objects, maintaining consistency with the TaskManager abstraction.

Comment on lines +489 to +490
task_manager = CeleryTaskManager()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider making TaskManager selection configurable.

The TaskManager implementation is currently hardcoded to CeleryTaskManager (Line 489). While this aligns with the PR objectives for compression scheduling, making this configurable via the CLP configuration file would support future scheduler implementations (e.g., Spider) without code changes.

Example approach:

scheduler_type = clp_config.compression_scheduler.get("scheduler_type", "celery")
if scheduler_type == "celery":
    task_manager = CeleryTaskManager()
elif scheduler_type == "spider":
    storage_url = clp_config.compression_scheduler.get("storage_url")
    task_manager = SpiderTaskManager(storage_url)
else:
    logger.error(f"Unsupported scheduler type: {scheduler_type}")
    return -1

Comment on lines +32 to +47
def submit(self, task_params: list[dict[str, Any]]) -> TaskManager.ResultHandle:
job = spider_py.group(
[compress for _ in range(len(task_params))],
)
job_args = []
for task_param in task_params:
job_args.append(spider_py.Int64(task_param["job_id"]))
job_args.append(spider_py.Int64(task_param["task_id"]))
job_args.append([spider_py.Int64(tag_id) for tag_id in task_param["tag_ids"]])
job_args.append(utf8_str_to_int8_list(task_param["clp_io_config_json"]))
job_args.append(utf8_str_to_int8_list(task_param["paths_to_compress_json"]))
job_args.append(
utf8_str_to_int8_list(json.dumps(task_param["clp_metadata_db_connection_config"]))
)
submitted_job = self._driver.submit_jobs([job], [job_args])[0]
return SpiderTaskManager.ResultHandle(submitted_job)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider structuring job_args more clearly.

The current implementation appends all task parameters to a single flat job_args list (Lines 36-45). While functional, organizing this as a list of per-task argument lists would improve readability and make the parameter structure more explicit.

Consider this alternative structure:

     def submit(self, task_params: list[dict[str, Any]]) -> TaskManager.ResultHandle:
         job = spider_py.group(
             [compress for _ in range(len(task_params))],
         )
-        job_args = []
-        for task_param in task_params:
-            job_args.append(spider_py.Int64(task_param["job_id"]))
-            job_args.append(spider_py.Int64(task_param["task_id"]))
-            job_args.append([spider_py.Int64(tag_id) for tag_id in task_param["tag_ids"]])
-            job_args.append(utf8_str_to_int8_list(task_param["clp_io_config_json"]))
-            job_args.append(utf8_str_to_int8_list(task_param["paths_to_compress_json"]))
-            job_args.append(
-                utf8_str_to_int8_list(json.dumps(task_param["clp_metadata_db_connection_config"]))
-            )
+        job_args = [
+            [
+                spider_py.Int64(task_param["job_id"]),
+                spider_py.Int64(task_param["task_id"]),
+                [spider_py.Int64(tag_id) for tag_id in task_param["tag_ids"]],
+                utf8_str_to_int8_list(task_param["clp_io_config_json"]),
+                utf8_str_to_int8_list(task_param["paths_to_compress_json"]),
+                utf8_str_to_int8_list(json.dumps(task_param["clp_metadata_db_connection_config"])),
+            ]
+            for task_param in task_params
+        ]
         submitted_job = self._driver.submit_jobs([job], [job_args])[0]
         return SpiderTaskManager.ResultHandle(submitted_job)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def submit(self, task_params: list[dict[str, Any]]) -> TaskManager.ResultHandle:
job = spider_py.group(
[compress for _ in range(len(task_params))],
)
job_args = []
for task_param in task_params:
job_args.append(spider_py.Int64(task_param["job_id"]))
job_args.append(spider_py.Int64(task_param["task_id"]))
job_args.append([spider_py.Int64(tag_id) for tag_id in task_param["tag_ids"]])
job_args.append(utf8_str_to_int8_list(task_param["clp_io_config_json"]))
job_args.append(utf8_str_to_int8_list(task_param["paths_to_compress_json"]))
job_args.append(
utf8_str_to_int8_list(json.dumps(task_param["clp_metadata_db_connection_config"]))
)
submitted_job = self._driver.submit_jobs([job], [job_args])[0]
return SpiderTaskManager.ResultHandle(submitted_job)
def submit(self, task_params: list[dict[str, Any]]) -> TaskManager.ResultHandle:
job = spider_py.group(
[compress for _ in range(len(task_params))],
)
job_args = [
[
spider_py.Int64(task_param["job_id"]),
spider_py.Int64(task_param["task_id"]),
[spider_py.Int64(tag_id) for tag_id in task_param["tag_ids"]],
utf8_str_to_int8_list(task_param["clp_io_config_json"]),
utf8_str_to_int8_list(task_param["paths_to_compress_json"]),
utf8_str_to_int8_list(
json.dumps(task_param["clp_metadata_db_connection_config"])
),
]
for task_param in task_params
]
submitted_job = self._driver.submit_jobs([job], [job_args])[0]
return SpiderTaskManager.ResultHandle(submitted_job)
🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/scheduler/compress/task_manager/spider_task_manager.py
around lines 32 to 47, the code builds a single flat job_args list by appending
all task parameters in sequence; instead, create job_args as a list of per-task
argument lists (for each task_param build a local args list containing the Int64
job_id, Int64 task_id, list of Int64 tag_ids, utf8_str_to_int8_list
clp_io_config_json, utf8_str_to_int8_list paths_to_compress_json, and
utf8_str_to_int8_list json.dumps(clp_metadata_db_connection_config), then append
that args list to job_args), and pass that per-task job_args into
self._driver.submit_jobs([job], [job_args]) so each task’s parameters are
clearly separated.

@LinZhihao-723 LinZhihao-723 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly modified the PR title (this will be a multi-line commit message).

@LinZhihao-723 LinZhihao-723 changed the title feat(job-orchestration): Add scheduler interface ; Add Spider scheduler. feat(job-orchestration): Add abstract interface for compression job submission and result retrieval: Oct 2, 2025
@LinZhihao-723 LinZhihao-723 merged commit 92426ed into y-scope:main Oct 2, 2025
22 checks passed
@sitaowang1998 sitaowang1998 deleted the spider-job-scheduler branch October 3, 2025 17:53
junhaoliao pushed a commit to junhaoliao/clp that referenced this pull request May 17, 2026
…ubmission and result retrieval: (y-scope#1354)

* Refactor Celery-based compression job management to use the new interface.
* Implement Spider-based compression job management.
Co-authored-by: LinZhihao-723 <zh.lin@mail.utoronto.ca>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants