[MP][Bugfix] fixing race condition for zmq output notifier#2808
[MP][Bugfix] fixing race condition for zmq output notifier#2808ApostaC merged 4 commits intoLMCache:devfrom
Conversation
Signed-off-by: ApostaC <yihua98@uchicago.edu>
|
Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits. |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request resolves a critical race condition within the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request effectively resolves a race condition in the MessageQueueServer._call_blocking_handler by replacing the non-thread-safe ZMQ PUSH/PULL inproc socket pair with os.eventfd. This change ensures thread-safe notification for output tasks, improving the stability and correctness of the message queue. The implementation correctly integrates os.eventfd for creation, signaling, polling, and resource cleanup, aligning with the described solution and leveraging an established pattern within the codebase.
maobaolong
left a comment
There was a problem hiding this comment.
Thanks for this fix, LGTM. left nit only.
| self.output_notifier, self.output_waiter = prepare_internal_push_pull_sockets( | ||
| self.ctx | ||
| ) | ||
| self._output_efd = os.eventfd(0, os.EFD_NONBLOCK | os.EFD_CLOEXEC) |
There was a problem hiding this comment.
nit: Worth adding a brief comment explaining why eventfd is used instead of the previous ZMQ socket, for future readers who won't have the PR context.
|
@cursor review |
1 similar comment
|
@cursor review |
|
@codex review |
Signed-off-by: ApostaC <yihua98@uchicago.edu>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 4054524b5f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if self.worker_thread.is_alive(): | ||
| self.worker_thread.join() | ||
| self.socket.close() | ||
| os.close(self._output_efd) |
There was a problem hiding this comment.
Do not close the eventfd before blocking callbacks finish
If close() runs while a blocking request is still executing, the callback registered in _call_blocking_handler() can fire after os.close(self._output_efd). Because thread_pool.shutdown(wait=False) does not wait for those callbacks, the late os.eventfd_write() is aimed at a closed integer fd; once that fd number is reused, Python will write eight bytes into the new resource instead of failing. I reproduced this by closing the server, reopening the same fd with os.open(), and observing b"\x01\0..." written into the new file while the client future timed out. This regression only needs an in-flight blocking handler during shutdown, which is possible from the HTTP server lifespan shutdown path.
Useful? React with 👍 / 👎.
Signed-off-by: baoloongmao <baoloongmao@tencent.com> LMCache#2808
Signed-off-by: ApostaC <yihua98@uchicago.edu>
What this PR does / why we need it:
Fixes a race condition in
MessageQueueServer._call_blocking_handlerwhere theoutput_notifierzmq PUSH socket was called fromThreadPoolExecutorcallback threads. ZMQ sockets are not thread-safe, so concurrentsend()calls from multiple worker threads could corrupt state.Replaces the zmq PUSH/PULL inproc socket pair with a single
os.eventfd.eventfd_write()is an atomic kernel syscall, making it inherently thread-safe for cross-thread notification. This also simplifies the drain logic —eventfd_read()atomically resets the counter in one call vs. a recv loop.If applicable: