Conversation
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces asynchronous support for several file system operations like rm, cp, and filedirs. This is achieved by adding a new async_task module that manages a worker thread and a task queue. The implementation is robust, featuring object pooling for thread synchronization primitives and graceful shutdown. The new asynchronous capability is immediately put to good use by making temporary file deletions non-blocking, which should provide a nice performance improvement. The changes to the threading module to support serialization of thread objects are also well-implemented. Overall, this is a great feature addition.
| function async_task.cp(srcpath, dstpath, opt) | ||
| opt = opt or {} | ||
| local ok, errors = async_task._ensure_started() | ||
| if not ok then | ||
| return false, errors | ||
| end | ||
|
|
||
| -- post task | ||
| srcpath = path.absolute(tostring(srcpath)) | ||
| dstpath = path.absolute(tostring(dstpath)) | ||
|
|
||
| local cmd = {kind = "cp", srcpath = srcpath, dstpath = dstpath} | ||
| local cmd_event, cmd_result | ||
| local is_detach = opt.detach | ||
|
|
||
| -- create event and result for non-detach mode | ||
| if not is_detach then | ||
| cmd_event = _get_event() | ||
| cmd_result = _get_sharedata() | ||
|
|
||
| -- serialize thread objects for passing to worker thread | ||
| cmd.event_data = thread._serialize_object(cmd_event) | ||
| cmd.result_data = thread._serialize_object(cmd_result) | ||
| end | ||
|
|
||
| task_mutex:lock() | ||
| task_queue:push(cmd) | ||
| local queue_size = task_queue:size() | ||
| task_mutex:unlock() | ||
|
|
||
| if is_detach then | ||
| -- We cache some tasks before executing them to avoid frequent thread switching. | ||
| if queue_size > 10 then | ||
| task_event:post() | ||
| end | ||
| return true | ||
| else | ||
| -- wait for completion | ||
| task_event:post() | ||
| cmd_event:wait(-1) | ||
| local result = cmd_result:get() | ||
| _put_event(cmd_event) | ||
| _put_sharedata(cmd_result) | ||
| if result and result.ok then | ||
| return true | ||
| else | ||
| return false, result and result.errors or "unknown error" | ||
| end | ||
| end | ||
| end | ||
|
|
||
| -- remove files or directories | ||
| function async_task.rm(filepath, opt) | ||
| opt = opt or {} | ||
| local ok, errors = async_task._ensure_started() | ||
| if not ok then | ||
| return false, errors | ||
| end | ||
|
|
||
| -- post task | ||
| filepath = path.absolute(tostring(filepath)) | ||
|
|
||
| local cmd = {kind = "rm", filepath = filepath} | ||
| local cmd_event, cmd_result | ||
| local is_detach = opt.detach | ||
|
|
||
| -- create event and result for non-detach mode | ||
| if not is_detach then | ||
| cmd_event = _get_event() | ||
| cmd_result = _get_sharedata() | ||
|
|
||
| -- serialize thread objects for passing to worker thread | ||
| cmd.event_data = thread._serialize_object(cmd_event) | ||
| cmd.result_data = thread._serialize_object(cmd_result) | ||
| end | ||
|
|
||
| task_mutex:lock() | ||
| task_queue:push(cmd) | ||
| local queue_size = task_queue:size() | ||
| task_mutex:unlock() | ||
|
|
||
| if is_detach then | ||
| -- We cache some tasks before executing them to avoid frequent thread switching. | ||
| if queue_size > 10 then | ||
| task_event:post() | ||
| end | ||
| return true | ||
| else | ||
| -- wait for completion | ||
| task_event:post() | ||
| cmd_event:wait(-1) | ||
| local result = cmd_result:get() | ||
| _put_event(cmd_event) | ||
| _put_sharedata(cmd_result) | ||
| if result and result.ok then | ||
| return true | ||
| else | ||
| return false, result and result.errors or "unknown error" | ||
| end | ||
| end | ||
| end | ||
|
|
||
| -- remove directories | ||
| function async_task.rmdir(dir, opt) | ||
| opt = opt or {} | ||
| local ok, errors = async_task._ensure_started() | ||
| if not ok then | ||
| return false, errors | ||
| end | ||
|
|
||
| -- post task | ||
| dir = path.absolute(tostring(dir)) | ||
|
|
||
| local cmd = {kind = "rmdir", dir = dir} | ||
| local cmd_event, cmd_result | ||
| local is_detach = opt.detach | ||
|
|
||
| -- create event and result for non-detach mode | ||
| if not is_detach then | ||
| cmd_event = _get_event() | ||
| cmd_result = _get_sharedata() | ||
|
|
||
| -- serialize thread objects for passing to worker thread | ||
| cmd.event_data = thread._serialize_object(cmd_event) | ||
| cmd.result_data = thread._serialize_object(cmd_result) | ||
| end | ||
|
|
||
| task_mutex:lock() | ||
| task_queue:push(cmd) | ||
| local queue_size = task_queue:size() | ||
| task_mutex:unlock() | ||
|
|
||
| if is_detach then | ||
| -- We cache some tasks before executing them to avoid frequent thread switching. | ||
| if queue_size > 10 then | ||
| task_event:post() | ||
| end | ||
| return true | ||
| else | ||
| -- wait for completion | ||
| task_event:post() | ||
| cmd_event:wait(-1) | ||
| local result = cmd_result:get() | ||
| _put_event(cmd_event) | ||
| _put_sharedata(cmd_result) | ||
| if result and result.ok then | ||
| return true | ||
| else | ||
| return false, result and result.errors or "unknown error" | ||
| end | ||
| end | ||
| end |
There was a problem hiding this comment.
The functions async_task.cp, async_task.rm, and async_task.rmdir share a large amount of boilerplate code for posting a task to the worker thread and handling the response. This code duplication can make maintenance harder, as any change to the task posting logic needs to be replicated in multiple places.
Consider refactoring this common logic into a helper function. This function could handle task submission, synchronization for non-detached tasks, and result handling. Each of the public functions (cp, rm, etc.) would then become a thin wrapper that prepares the command table and calls this new helper function.
4980f45 to
72f8ed7
Compare
#6868
Supported APIs
async: block, it can be scheduled with other coroutine tasks
async + detach: non-block, It executes in a background thread, so we don't need to wait for the return value.