Change iterator over multiple Queue wrappers to request all protocols simulteniously#769
Change iterator over multiple Queue wrappers to request all protocols simulteniously#769VitalyFedyunin wants to merge 6 commits intogh/VitalyFedyunin/20/basefrom
Conversation
… simulteniously [ghstack-poisoned]
…l protocols simulteniously" This is part of MPRS optimizations, changes are covered by the existing test_dataloader2.py test. [ghstack-poisoned]
|
|
||
| pass |
There was a problem hiding this comment.
BTW, It might be better to add an Error message in __init__.
super().__init__(msg).
| for idx in range(total_pipes): | ||
| self.datapipes[idx].protocol.request_next() |
There was a problem hiding this comment.
This is the main review comment, the rests are some nits.
Might be a noob question. We now request and receive data from protocol object. Then, do we still need QueueWrapper? We can directly let _IterateQueueDataPipes store a list of protocol clients.
There was a problem hiding this comment.
QueueWrapper handles terminations (and snapshotting in the future). Direct access to protocol here is only required to reorder traversals.
There was a problem hiding this comment.
However, I'm still considering the possibility of merging _IterateQueueDataPipes and QueueWrapper to make it one class that supports 1:M queues.
…l protocols simulteniously" This is part of MPRS optimizations, changes are covered by the existing test_dataloader2.py test. [ghstack-poisoned]
…l protocols simulteniously" This is part of MPRS optimizations, changes are covered by the existing test_dataloader2.py test. [ghstack-poisoned]
| if isinstance(response, communication.messages.InvalidStateResponse): | ||
| raise communication.iter.InvalidStateResetRequired | ||
| if isinstance(response, communication.messages.TerminateResponse): | ||
| raise communication.iter.TerminateRequired |
There was a problem hiding this comment.
Question: shouldn't these be caught by QueueWrapper's method nonblocking_next?
There was a problem hiding this comment.
No, because I'm not using next of QueueWrapper, but instead accessing protocols next directly.
|
@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
…l protocols simulteniously" This is part of MPRS optimizations, changes are covered by the existing test_dataloader2.py test. Differential Revision: [D39816752](https://our.internmc.facebook.com/intern/diff/D39816752) [ghstack-poisoned]
|
@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
…l protocols simulteniously" This is part of MPRS optimizations, changes are covered by the existing test_dataloader2.py test. Differential Revision: [D39816752](https://our.internmc.facebook.com/intern/diff/D39816752) [ghstack-poisoned]
|
@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
This is part of MPRS optimizations, changes are covered by the existing test_dataloader2.py test.
Stack from ghstack (oldest at bottom):
Differential Revision: D39816752