perf(clp-package): Reduce batch scheduling jitter in the query scheduler.#1899
Conversation
WalkthroughThe change optimizes polling behaviour in the query scheduler by introducing a timeout parameter to asynchronous task result retrieval and adjusting the polling loop to account for elapsed execution time, maintaining consistent polling intervals. Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~8 minutes Possibly related issues
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In
`@components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py`:
- Around line 1088-1096: The loop in handle_job_updates can compute a negative
sleep when handle_cancelling_search_jobs and check_job_status_and_update_db
exceed jobs_poll_delay; change the sleep calculation so the duration is clamped
to a non-negative value (e.g., compute remaining = jobs_poll_delay -
(interval_end_time - interval_start_time).total_seconds() and call
asyncio.sleep(max(0, remaining))). Update the sleep call in handle_job_updates
to use this guarded remaining value while keeping
interval_start_time/interval_end_time and jobs_poll_delay as the sources of
timing.
| async def handle_job_updates(db_conn_pool, results_cache_uri: str, jobs_poll_delay: float): | ||
| while True: | ||
| interval_start_time = datetime.datetime.now() | ||
| await handle_cancelling_search_jobs(db_conn_pool) | ||
| await check_job_status_and_update_db(db_conn_pool, results_cache_uri) | ||
| await asyncio.sleep(jobs_poll_delay) | ||
| interval_end_time = datetime.datetime.now() | ||
| await asyncio.sleep( | ||
| jobs_poll_delay - (interval_end_time - interval_start_time).total_seconds() | ||
| ) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Guard against negative sleep duration when work exceeds the polling interval.
If handle_cancelling_search_jobs and check_job_status_and_update_db take longer than jobs_poll_delay, the computed sleep duration becomes negative. While asyncio.sleep() treats negative values as 0, making this explicit improves clarity and intent.
💡 Suggested improvement
async def handle_job_updates(db_conn_pool, results_cache_uri: str, jobs_poll_delay: float):
while True:
interval_start_time = datetime.datetime.now()
await handle_cancelling_search_jobs(db_conn_pool)
await check_job_status_and_update_db(db_conn_pool, results_cache_uri)
interval_end_time = datetime.datetime.now()
+ remaining_time = jobs_poll_delay - (interval_end_time - interval_start_time).total_seconds()
await asyncio.sleep(
- jobs_poll_delay - (interval_end_time - interval_start_time).total_seconds()
+ max(0, remaining_time)
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def handle_job_updates(db_conn_pool, results_cache_uri: str, jobs_poll_delay: float): | |
| while True: | |
| interval_start_time = datetime.datetime.now() | |
| await handle_cancelling_search_jobs(db_conn_pool) | |
| await check_job_status_and_update_db(db_conn_pool, results_cache_uri) | |
| await asyncio.sleep(jobs_poll_delay) | |
| interval_end_time = datetime.datetime.now() | |
| await asyncio.sleep( | |
| jobs_poll_delay - (interval_end_time - interval_start_time).total_seconds() | |
| ) | |
| async def handle_job_updates(db_conn_pool, results_cache_uri: str, jobs_poll_delay: float): | |
| while True: | |
| interval_start_time = datetime.datetime.now() | |
| await handle_cancelling_search_jobs(db_conn_pool) | |
| await check_job_status_and_update_db(db_conn_pool, results_cache_uri) | |
| interval_end_time = datetime.datetime.now() | |
| remaining_time = jobs_poll_delay - (interval_end_time - interval_start_time).total_seconds() | |
| await asyncio.sleep( | |
| max(0, remaining_time) | |
| ) |
🧰 Tools
🪛 Ruff (0.14.13)
1090-1090: datetime.datetime.now() called without a tz argument
(DTZ005)
1093-1093: datetime.datetime.now() called without a tz argument
(DTZ005)
🤖 Prompt for AI Agents
In
`@components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py`
around lines 1088 - 1096, The loop in handle_job_updates can compute a negative
sleep when handle_cancelling_search_jobs and check_job_status_and_update_db
exceed jobs_poll_delay; change the sleep calculation so the duration is clamped
to a non-negative value (e.g., compute remaining = jobs_poll_delay -
(interval_end_time - interval_start_time).total_seconds() and call
asyncio.sleep(max(0, remaining))). Update the sleep call in handle_job_updates
to use this guarded remaining value while keeping
interval_start_time/interval_end_time and jobs_poll_delay as the sources of
timing.
There was a problem hiding this comment.
The behaviour of asyncio.sleep for negative durations is well-documented, so handling this as a special case in our own code decreases clarity imo.
There was a problem hiding this comment.
@gibber9809, that's a fair point. You're right that asyncio.sleep() with delay values of 0 or less is well-defined behavior, and since the function handles it consistently, keeping the code simpler without the explicit max() check is reasonable. Thanks for the clarification!
🧠 Learnings used
Learnt from: haiqi96
Repo: y-scope/clp PR: 569
File: components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py:380-392
Timestamp: 2024-11-15T20:07:22.256Z
Learning: The current implementation assumes single-threaded execution, so race conditions in functions like `is_target_extracted` in `components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py` are not currently a concern.
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR `#1152` discussion.
|
Doing one more experiment to see if I can get the jitter even lower -- noticed on a second look that the task retiring is in the polling loop here, but the task dispatching is in our other polling loop. Going to see if there are any easy improvements to that polling loop that further reduce jitter. |
Yeah seems like there's potentially at least one or two more obvious improvements worth trying out. Going to temporarily put this PR in draft, then take it out once I've finished trying them. |
|
Marking ready for review but removing "fixes #1897" since this PR doesn't completely remove the jitter, it just significantly reduces it. |
| if not async_task_result.ready(): | ||
| return None | ||
| return async_task_result.get() | ||
| return async_task_result.get(interval=0.005) |
There was a problem hiding this comment.
I am a little confused about the interval. If the result is already ready, why do we need an interval?
There was a problem hiding this comment.
Because celery still polls for the result for some reason and never seems to actually retrieve it on the first request. This polling interval is 0.5 seconds by default, so what was happening previously was that this call would always wait for 0.5 seconds then retrieve the result -- you can see this in the breakdown shown in #1897.
As for why celery works this way, I'm not sure -- some sources online imply that task state (running, failed, successful) is tracked separately from task result payload, which could explain this behaviour.
LinZhihao-723
left a comment
There was a problem hiding this comment.
For the PR title, how about:
perf(clp-package): Reduce batch scheduling jitter in the query scheduler.
Otherwise, I think they all lgtm.

Description
The query scheduler has extreme jitter in the main task dispatch loop, making task dispatch frequency a direct function of this jitter as described in #1897.
This PR significantly reduces the jitter, bringing the pacing of the main scheduling loop much closer to the configured polling interval, and likewise pulling the maximum task dispatch rate closer to the polling interval. The following chart shows the jitter and breakdown of time spent over a small time slice of a search query as batches of search tasks are completed.
When compared to the chart in #1897, you can see that the main source of the jitter (long waits for retrieving celery results) has been significantly reduced, and pacing for the main scheduling loop is much closer to the configured rate (configured to once every 0.1 seconds in this case).
Note that while the pacing still isn't perfect, the part of this diff that attempts to explicitly pace the scheduling loop does help (without this part of the diff excursions from the configured rate are even larger). This means that there are likely other sources of jitter in the query scheduler that we could attempt to eliminate.
Also note that reducing jitter in this PR results directly in improved query performance in the package (since batches of tasks can now be dispatched more frequently). For some queries consisting of very short running tasks I measured a decrease in query time from ~1minute to ~18seconds in my single-machine docker-compose setup.
Checklist
breaking change.
Validation performed
Summary by CodeRabbit
Bug Fixes
Performance
✏️ Tip: You can customize this high-level summary in your review settings.