Skip to content

Improve runjobs to reduce the time spent on coroutine scheduling#6892

Merged
waruqi merged 12 commits intodevfrom
runjobs
Oct 6, 2025
Merged

Improve runjobs to reduce the time spent on coroutine scheduling#6892
waruqi merged 12 commits intodevfrom
runjobs

Conversation

@waruqi
Copy link
Member

@waruqi waruqi commented Oct 5, 2025

  • Reduce the time spent on coroutine scheduling
  • Add build.distcc.remote_only policy to enable build on only remote machines

Before optimization

ruki:xmake ruki$ xmake l tests/benchmarks/async/runjobs.lua
runjobs(10000/1): 1678 ms, plain: 0 ms
runjobs(10000/10): 1853 ms, plain: 0 ms
runjobs(10000/100): 1904 ms, plain: 1 ms
runjobs_proc(1000/10): 63061 ms

After optimization

ruki:xmake ruki$ xmake l tests/benchmarks/async/runjobs.lua
runjobs(10000/1): 50 ms, plain: 0 ms
runjobs(10000/10): 51 ms, plain: 0 ms
runjobs(10000/100): 67 ms, plain: 0 ms
runjobs_proc(1000/10): 52977 ms

@waruqi waruqi added this to the v3.0.4 milestone Oct 5, 2025
@waruqi waruqi marked this pull request as ready for review October 5, 2025 15:47
@waruqi
Copy link
Member Author

waruqi commented Oct 5, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request significantly refactors the asynchronous job execution logic in runjobs.lua, moving to a much cleaner and more robust consumer-based model using semaphores. The changes also improve the semaphore implementation itself by switching from a hashset to a queue for waiting coroutines. While the overall design is a major improvement, I've identified a critical issue in the new semaphore post logic that could lead to a deadlock under error conditions. Additionally, there's a medium-severity issue in runjobs.lua regarding how job completion is counted, which leads to inaccurate progress reporting.

Comment on lines 55 to 76
function _semaphore:post(value)
if self._POSTING then
return self._VALUE
end
self._POSTING = true
local new_value = self._VALUE + value
self._VALUE = new_value
if new_value > 0 then
local pending = {}
local waiting = self._WAITING
for item in waiting:items() do
if #pending < new_value then
table.insert(pending, item)
else
local post_count = 0
while not waiting:empty() do
local item = waiting:pop()
scheduler:co_resume(item)
post_count = post_count + 1
if post_count >= new_value then
break
end
end
for _, item in ipairs(pending) do
scheduler:co_resume(item)
end
end
self._POSTING = false
return new_value
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The _POSTING flag is not guaranteed to be reset to false if an error occurs within this function (e.g., if value is not a number, or if scheduler:co_resume is modified to throw an error). If an error is thrown, the function will exit without resetting self._POSTING, causing a permanent lock on the semaphore. All subsequent calls to :post() will then return early, effectively deadlocking any waiting coroutines.

To ensure the lock is always released, you should wrap the critical section in a construct that provides finally semantics, which can be emulated in Lua using pcall.

function _semaphore:post(value)
    if self._POSTING then
        return self._VALUE
    end
    self._POSTING = true
    local ok, res_or_err = pcall(function()
        local new_value = self._VALUE + (value or 1)
        self._VALUE = new_value
        if new_value > 0 then
            local waiting = self._WAITING
            local post_count = 0
            while not waiting:empty() do
                local item = waiting:pop()
                local resume_ok, resume_err = scheduler:co_resume(item)
                if not resume_ok then
                    error(resume_err, 0)
                end
                post_count = post_count + 1
                if post_count >= new_value then
                    break
                end
            end
        end
        return new_value
    end)
    self._POSTING = false
    if not ok then
        error(res_or_err, 0)
    end
    return res_or_err
end

Comment on lines +180 to +181
state.finished_count = state.finished_count + 1
job_func(job_index, total, {progress = state.progress_wrapper})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Incrementing state.finished_count before the job function is called is problematic. It means the count represents jobs that have started, not finished. This has two main consequences:

  1. The variable name finished_count is misleading and harms maintainability. It should probably be dispatched_count or started_count.
  2. The progress reporting becomes inaccurate, as it will show progress based on job dispatch rather than completion. A long-running or failing job will appear as "finished" in the progress calculation.

A better approach is to increment a counter only after job_func completes successfully. This would require using a separate counter for dispatching versus completion and likely using a lock to safely increment the completion counter, as job_func can yield.

@waruqi waruqi changed the title Improve async runjobs Improve runjobs to reduce the time spent on coroutine scheduling Oct 5, 2025
@waruqi waruqi merged commit 273d936 into dev Oct 6, 2025
42 of 44 checks passed
@waruqi waruqi deleted the runjobs branch October 6, 2025 12:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant