Use TCP to send AUX#2
Conversation
Summary of ChangesHello @maning00, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refactors the auxiliary data transfer mechanism within the system, transitioning from RDMA to TCP for improved flexibility or compatibility. It introduces a dedicated codec for efficient data handling and updates the core KV manager to support the new TCP-based communication flow for auxiliary data. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request refactors the auxiliary data transfer to use TCP via ZMQ instead of RDMA. This involves adding an AuxDataCodec for data serialization and updating the MoriKVManager to handle TCP-based communication for auxiliary data. The overall logic seems sound. I've provided a high-severity comment regarding a potential memory corruption vulnerability due to a missing size check, and a medium-severity comment to improve maintainability by replacing magic numbers with named constants.
| dst_aux_ptr = kv_args.aux_data_ptrs[buffer_index] | ||
| item_len = kv_args.aux_item_lens[buffer_index] | ||
| dst_addr = dst_aux_ptr + item_len * aux_index | ||
| buffer = (ctypes.c_byte * len(data)).from_address(dst_addr) | ||
| buffer[:] = data | ||
| return |
There was a problem hiding this comment.
The current implementation does not validate the size of the received data against the allocated buffer size (item_len). If len(data) is greater than item_len, this could lead to a buffer overflow, writing past the intended memory region for the given aux_index. This can cause memory corruption.
To prevent this, a check should be added to ensure the received data size does not exceed the allocated item length.
| dst_aux_ptr = kv_args.aux_data_ptrs[buffer_index] | |
| item_len = kv_args.aux_item_lens[buffer_index] | |
| dst_addr = dst_aux_ptr + item_len * aux_index | |
| buffer = (ctypes.c_byte * len(data)).from_address(dst_addr) | |
| buffer[:] = data | |
| return | |
| dst_aux_ptr = kv_args.aux_data_ptrs[buffer_index] | |
| item_len = kv_args.aux_item_lens[buffer_index] | |
| if len(data) > item_len: | |
| logger.error( | |
| f"Received aux data size ({len(data)}) is larger than " | |
| f"the allocated item length ({item_len})." | |
| ) | |
| return | |
| dst_addr = dst_aux_ptr + item_len * aux_index | |
| buffer = (ctypes.c_byte * len(data)).from_address(dst_addr) | |
| buffer[:] = data |
| room = int(msg[1].decode("ascii")) | ||
| buffer_index = int(msg[2].decode("ascii")) | ||
| aux_index = int(msg[3].decode("ascii")) | ||
| data_length = struct.unpack(">I", msg[4])[0] | ||
| data = msg[5] |
There was a problem hiding this comment.
The indices for accessing parts of the msg list are hardcoded as magic numbers (e.g., msg[1], msg[2]). This makes the code harder to read and maintain. If the message format changes, these indices would need to be updated in multiple places, which is error-prone.
It's recommended to define these indices as constants within the MoriKVManager class. This would make the code more self-documenting and easier to refactor.
For example, you could add these constants to MoriKVManager (around line 172):
class MoriKVManager(CommonKVManager):
AUX_DATA_HEADER = b"AUX_DATA"
_AUX_MSG_IDX_ROOM = 1
_AUX_MSG_IDX_BUFFER_IDX = 2
_AUX_MSG_IDX_AUX_IDX = 3
_AUX_MSG_IDX_LEN = 4
_AUX_MSG_IDX_DATA = 5And then use them here and in send_aux_data_to_endpoint.
| room = int(msg[1].decode("ascii")) | |
| buffer_index = int(msg[2].decode("ascii")) | |
| aux_index = int(msg[3].decode("ascii")) | |
| data_length = struct.unpack(">I", msg[4])[0] | |
| data = msg[5] | |
| room = int(msg[self._AUX_MSG_IDX_ROOM].decode("ascii")) | |
| buffer_index = int(msg[self._AUX_MSG_IDX_BUFFER_IDX].decode("ascii")) | |
| aux_index = int(msg[self._AUX_MSG_IDX_AUX_IDX].decode("ascii")) | |
| data_length = struct.unpack(">I", msg[self._AUX_MSG_IDX_LEN])[0] | |
| data = msg[self._AUX_MSG_IDX_DATA] |
* add disable notif * send aux with tcp * remove unused log --------- Co-authored-by: cwortman-amd <cwortman@amd.com>
* add disable notif * send aux with tcp * remove unused log --------- Co-authored-by: cwortman-amd <cwortman@amd.com>
* add disable notif * send aux with tcp * remove unused log --------- Co-authored-by: cwortman-amd <cwortman@amd.com>
Motivation
Modifications
Accuracy Tests
Benchmarking and Profiling
Checklist