-
Notifications
You must be signed in to change notification settings - Fork 354
Description
Describe the bug
When sending torch tensors using the provided utils in dora.cuda, I find that the data are frequently incorrect on the receiver side. This typically happens when the tensor is, e.g., composed of other constituent tensors (like from stacking or concatenating).
To Reproduce
Simple MWE with a sender and receiver:
# sender.py
import torch
from dora import Node
from dora.cuda import torch_to_ipc_buffer
def main():
sender = Node(node_id="sender")
tensor1 = torch.arange(6, dtype=torch.float32, device="cuda").reshape(2, 3)
tensor2 = torch.arange(6, dtype=torch.float32, device="cuda").reshape(2, 3) + 6.0
tensor = torch.stack([tensor1, tensor2], dim=0)
ipc_buffer, metadata = torch_to_ipc_buffer(tensor)
for _ in range(100):
sender.send_output("tensor", ipc_buffer, metadata)
if __name__ == "__main__":
main()# receiver.py
import pyarrow as pa
import torch
from dora import Node
from dora.cuda import cudabuffer_to_torch, ipc_buffer_to_ipc_handle
def main():
receiver = Node(node_id="receiver")
ctx = pa.cuda.Context()
tensor1 = torch.arange(6, dtype=torch.float32, device="cuda").reshape(2, 3)
tensor2 = torch.arange(6, dtype=torch.float32, device="cuda").reshape(2, 3) + 6.0
expected_tensor = torch.stack([tensor1, tensor2], dim=0)
while True:
event = receiver.next(timeout=1e-3)
if event["type"] == "INPUT":
ipc_handle = ipc_buffer_to_ipc_handle(event["value"])
cudabuffer = ctx.open_ipc_buffer(ipc_handle)
tensor = cudabuffer_to_torch(cudabuffer, event["metadata"]).clone()
assert torch.allclose(tensor, expected_tensor), f"Received tensor: {tensor} does not match expected tensor: {expected_tensor}"
if __name__ == "__main__":
main()# mwe.yml
nodes:
- id: sender
path: /path/to/sender.py
outputs:
- tensor
- id: receiver
path: /path/to/receiver.py
inputs:
tensor:
source: sender/tensor
queue_size: 1When I run
dora run mwe.yamlI get the following error:
Dataflow failed:
Node `receiver` failed: exited with code 1 with stderr output:
---------------------------------------------------------------------------------
[...]AssertionError: Received tensor: tensor([[[0., 1., 2.],
[3., 4., 5.]],
[[0., 0., 0.],
[0., 0., 0.]]], device='cuda:0') does not match expected tensor: tensor([[[ 0., 1., 2.],
[ 3., 4., 5.]],
[[ 6., 7., 8.],
[ 9., 10., 11.]]], device='cuda:0')
---------------------------------------------------------------------------------
Environments (please complete the following information):
Output of uname --all
Linux vulcan 6.11.0-21-generic #21~24.04.1-Ubuntu SMP PREEMPT_DYNAMIC Mon Feb 24 16:52:15 UTC 2 x86_64 x86_64 x86_64 GNU/Linux
Versions:
dora-cli 0.3.11
Name: dora-rs
Version: 0.3.11
Summary: `dora` goal is to be a low latency, composable, and distributed data flow.
Home-page:
Author:
Author-email:
License: MIT
Location: /home/albert/miniconda3/envs/judo/lib/python3.12/site-packages
Requires: pyarrow
Required-by: dora-rs-cli
Additional context
The above example is an example of what happens when I try to send a stack of images read from a camera into a GPU buffer to some other node for processing. If I just send a single (unstacked) image straight out of the buffer, it seems to be received fine.
The following still produce the bug in my application:
- first cloning the tensors before sending
- calling
contiguous()before sending - converting the tensor to numpy and back to a torch tensor before sending