Properly cleanup processes and queues for MPRS and Fix pause for prefetch#1075
Properly cleanup processes and queues for MPRS and Fix pause for prefetch#1075ejguan wants to merge 1 commit intometa-pytorch:mainfrom
pause for prefetch#1075Conversation
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
TBH, I hope we can cherrypick this PR to release branch, but probably not feasible as the final RC has been cut. |
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
Converting back to draft as this PR seems breaking some tests |
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
pause for prefetch
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
| res_queue.cancel_join_thread() | ||
| process.daemon = True | ||
| process.start() | ||
| self._dispatch_process = (process, req_queues, res_queues) |
There was a problem hiding this comment.
non-blocking note: Maybe we should refactor out the dispatching code at some point... It is getting too much
There was a problem hiding this comment.
Yeah. We should spin off it and worker launch to a separate file
| process_name=f"worker process {worker_id}", | ||
| call_on_process_init=call_on_process_init, | ||
| ) | ||
| req_queue.cancel_join_thread() |
There was a problem hiding this comment.
IIUC, this allows join_thread to be non-blocking?
There was a problem hiding this comment.
Emmm, not exactly. This would make sure req_queue not being joined by gc when the main process wants to exit. Then, it would guarantee that worker process can still access the TerminationRequest from req_queue rather than any corrupted data from a closed queue
| response = self.res_buffers[idx].popleft() | ||
| else: | ||
| response = self.datapipes[idx].protocol.get_response_next(block=True) | ||
| while not self._terminated: |
There was a problem hiding this comment.
Question: so we are guaranteeing termination message will be sent?
There was a problem hiding this comment.
This is guarantee if there is a prefetch attached after this DataPipe the worker thread would be able to know it's terminated. If we keep the blocking request here, the worker thread will be stuck at the blocking call even though we have already terminated this DataPipe -> hanging at the end of thread.join() in prefetch.
| @@ -29,12 +32,60 @@ def _add_one(x: int) -> int: | |||
| dp_parametrize = parametrize("dp", test_dps) | |||
|
|
|||
|
|
|||
There was a problem hiding this comment.
Did none of the existing pause tests break? Amazing
There was a problem hiding this comment.
Lots of tests are broken before I make pause blocking for Prefetcher. So, I have to make the change.
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
1 similar comment
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
This pull request was exported from Phabricator. Differential Revision: D43994511 |
…efetch (meta-pytorch#1075) Summary: Fixes issue about `MPRS.finalize` when `dataloader2.shutdown()` is called ### Changes - DataLoader2 should always clean up `datapipe_iter` at shutdown - Guard `MPRS` to finalize once - Fix the problem of `ConnectionError` when DataLoader early exits - This is caused by `queue` is joined when main/worker/dispatching process exits. No more request/response can be passed across processes. - Consumer process shouldn't join the `req_queue` at exit to make sure producer process can still access the remaining request. And, consumer will close `req_queue` after clean up to prevent any further request sent to queue. - Produce process shouldn't join the `res_queue` at exit to make sure consumer process can still access response. And, producer will close `res_queue` after clean up to prevent any further response sent to queue. - Main (Consumer) <-> Worker (Producer) - Worker (Consumer) -> Dispatching (Producer) - Fix `pause` API for DataLoader2 - Invoke `pause` lazily until the `limit+1` iteration is reached to align with python's iterator behavior. - Make `prefetch.pause` blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from `iter`. - Add tests to validate Pull Request resolved: meta-pytorch#1075 Reviewed By: NivekT Differential Revision: D43994511 Pulled By: ejguan fbshipit-source-id: 51dbecd5f68f6e34987235ba8f00461d3b0f4431
…efetch (meta-pytorch#1075) Summary: Fixes issue about `MPRS.finalize` when `dataloader2.shutdown()` is called ### Changes - DataLoader2 should always clean up `datapipe_iter` at shutdown - Guard `MPRS` to finalize once - Fix the problem of `ConnectionError` when DataLoader early exits - This is caused by `queue` is joined when main/worker/dispatching process exits. No more request/response can be passed across processes. - Consumer process shouldn't join the `req_queue` at exit to make sure producer process can still access the remaining request. And, consumer will close `req_queue` after clean up to prevent any further request sent to queue. - Produce process shouldn't join the `res_queue` at exit to make sure consumer process can still access response. And, producer will close `res_queue` after clean up to prevent any further response sent to queue. - Main (Consumer) <-> Worker (Producer) - Worker (Consumer) -> Dispatching (Producer) - Fix `pause` API for DataLoader2 - Invoke `pause` lazily until the `limit+1` iteration is reached to align with python's iterator behavior. - Make `prefetch.pause` blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from `iter`. - Add tests to validate Pull Request resolved: meta-pytorch#1075 Reviewed By: NivekT Differential Revision: D43994511 Pulled By: ejguan fbshipit-source-id: db077e09f62b96de2fff71635c46dc42c839c9ec
|
This pull request was exported from Phabricator. Differential Revision: D43994511 |
|
This pull request has been reverted by 7d97475. |
…efetch (meta-pytorch#1075) Summary: Fixes issue about `MPRS.finalize` when `dataloader2.shutdown()` is called ### Changes - DataLoader2 should always clean up `datapipe_iter` at shutdown - Guard `MPRS` to finalize once - Fix the problem of `ConnectionError` when DataLoader early exits - This is caused by `queue` is joined when main/worker/dispatching process exits. No more request/response can be passed across processes. - Consumer process shouldn't join the `req_queue` at exit to make sure producer process can still access the remaining request. And, consumer will close `req_queue` after clean up to prevent any further request sent to queue. - Produce process shouldn't join the `res_queue` at exit to make sure consumer process can still access response. And, producer will close `res_queue` after clean up to prevent any further response sent to queue. - Main (Consumer) <-> Worker (Producer) - Worker (Consumer) -> Dispatching (Producer) - Fix `pause` API for DataLoader2 - Invoke `pause` lazily until the `limit+1` iteration is reached to align with python's iterator behavior. - Make `prefetch.pause` blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from `iter`. - Add tests to validate Pull Request resolved: meta-pytorch#1075 Differential Revision: D44168655 Pulled By: ejguan fbshipit-source-id: 4fa8c34438414cad7f46527355eb0d51866e6b15
…efetch (#1096) Summary: Pull Request resolved: #1096 Fixes issue about `MPRS.finalize` when `dataloader2.shutdown()` is called ### Changes - DataLoader2 should always clean up `datapipe_iter` at shutdown - Guard `MPRS` to finalize once - Fix the problem of `ConnectionError` when DataLoader early exits - This is caused by `queue` is joined when main/worker/dispatching process exits. No more request/response can be passed across processes. - Consumer process shouldn't join the `req_queue` at exit to make sure producer process can still access the remaining request. And, consumer will close `req_queue` after clean up to prevent any further request sent to queue. - Produce process shouldn't join the `res_queue` at exit to make sure consumer process can still access response. And, producer will close `res_queue` after clean up to prevent any further response sent to queue. - Main (Consumer) <-> Worker (Producer) - Worker (Consumer) -> Dispatching (Producer) - Fix `pause` API for DataLoader2 - Invoke `pause` lazily until the `limit+1` iteration is reached to align with python's iterator behavior. - Make `prefetch.pause` blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from `iter`. - Add tests to validate Pull Request resolved: #1075 Reviewed By: NivekT Differential Revision: D44168655 Pulled By: ejguan fbshipit-source-id: fdfee5c27b512b5c0d5308e53a81b1cb2db70a43
…efetch (#1096) Summary: Pull Request resolved: #1096 Fixes issue about `MPRS.finalize` when `dataloader2.shutdown()` is called ### Changes - DataLoader2 should always clean up `datapipe_iter` at shutdown - Guard `MPRS` to finalize once - Fix the problem of `ConnectionError` when DataLoader early exits - This is caused by `queue` is joined when main/worker/dispatching process exits. No more request/response can be passed across processes. - Consumer process shouldn't join the `req_queue` at exit to make sure producer process can still access the remaining request. And, consumer will close `req_queue` after clean up to prevent any further request sent to queue. - Produce process shouldn't join the `res_queue` at exit to make sure consumer process can still access response. And, producer will close `res_queue` after clean up to prevent any further response sent to queue. - Main (Consumer) <-> Worker (Producer) - Worker (Consumer) -> Dispatching (Producer) - Fix `pause` API for DataLoader2 - Invoke `pause` lazily until the `limit+1` iteration is reached to align with python's iterator behavior. - Make `prefetch.pause` blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from `iter`. - Add tests to validate Pull Request resolved: #1075 Reviewed By: NivekT Differential Revision: D44168655 Pulled By: ejguan fbshipit-source-id: fdfee5c27b512b5c0d5308e53a81b1cb2db70a43
…efetch (#1096) Summary: Pull Request resolved: #1096 Fixes issue about `MPRS.finalize` when `dataloader2.shutdown()` is called ### Changes - DataLoader2 should always clean up `datapipe_iter` at shutdown - Guard `MPRS` to finalize once - Fix the problem of `ConnectionError` when DataLoader early exits - This is caused by `queue` is joined when main/worker/dispatching process exits. No more request/response can be passed across processes. - Consumer process shouldn't join the `req_queue` at exit to make sure producer process can still access the remaining request. And, consumer will close `req_queue` after clean up to prevent any further request sent to queue. - Produce process shouldn't join the `res_queue` at exit to make sure consumer process can still access response. And, producer will close `res_queue` after clean up to prevent any further response sent to queue. - Main (Consumer) <-> Worker (Producer) - Worker (Consumer) -> Dispatching (Producer) - Fix `pause` API for DataLoader2 - Invoke `pause` lazily until the `limit+1` iteration is reached to align with python's iterator behavior. - Make `prefetch.pause` blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from `iter`. - Add tests to validate Pull Request resolved: #1075 Reviewed By: NivekT Differential Revision: D44168655 Pulled By: ejguan fbshipit-source-id: fdfee5c27b512b5c0d5308e53a81b1cb2db70a43
Fixes issue about
MPRS.finalizewhendataloader2.shutdown()is calledChanges
datapipe_iterat shutdownMPRSto finalize onceConnectionErrorwhen DataLoader early exitsqueueis joined when main/worker/dispatching process exits. No more request/response can be passed across processes.req_queueat exit to make sure producer process can still access the remaining request. And, consumer will closereq_queueafter clean up to prevent any further request sent to queue.res_queueat exit to make sure consumer process can still access response. And, producer will closeres_queueafter clean up to prevent any further response sent to queue.pauseAPI for DataLoader2pauselazily until thelimit+1iteration is reached to align with python's iterator behavior.prefetch.pauseblocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data fromiter.