Skip to content

Properly cleanup processes and queues for MPRS and Fix pause for prefetch#1075

Closed
ejguan wants to merge 1 commit intometa-pytorch:mainfrom
ejguan:exit_mprs
Closed

Properly cleanup processes and queues for MPRS and Fix pause for prefetch#1075
ejguan wants to merge 1 commit intometa-pytorch:mainfrom
ejguan:exit_mprs

Conversation

@ejguan
Copy link
Contributor

@ejguan ejguan commented Mar 10, 2023

Fixes issue about MPRS.finalize when dataloader2.shutdown() is called

Changes

  • DataLoader2 should always clean up datapipe_iter at shutdown
  • Guard MPRS to finalize once
  • Fix the problem of ConnectionError when DataLoader early exits
    • This is caused by queue is joined when main/worker/dispatching process exits. No more request/response can be passed across processes.
      • Consumer process shouldn't join the req_queue at exit to make sure producer process can still access the remaining request. And, consumer will close req_queue after clean up to prevent any further request sent to queue.
      • Produce process shouldn't join the res_queue at exit to make sure consumer process can still access response. And, producer will close res_queue after clean up to prevent any further response sent to queue.
        • Main (Consumer) <-> Worker (Producer)
        • Worker (Consumer) -> Dispatching (Producer)
  • Fix pause API for DataLoader2
    • Invoke pause lazily until the limit+1 iteration is reached to align with python's iterator behavior.
    • Make prefetch.pause blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from iter.
  • Add tests to validate

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Mar 10, 2023
@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@ejguan
Copy link
Contributor Author

ejguan commented Mar 10, 2023

TBH, I hope we can cherrypick this PR to release branch, but probably not feasible as the final RC has been cut.

@ejguan ejguan requested a review from NivekT March 10, 2023 22:47
@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@ejguan
Copy link
Contributor Author

ejguan commented Mar 13, 2023

Converting back to draft as this PR seems breaking some tests

@ejguan ejguan marked this pull request as draft March 13, 2023 16:42
@ejguan ejguan removed the request for review from NivekT March 13, 2023 16:42
@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@ejguan ejguan changed the title Properly cleanup processes and queues for MPRS Properly cleanup processes and queues for MPRS and Fix pause for prefetch Mar 15, 2023
@ejguan ejguan marked this pull request as ready for review March 15, 2023 22:17
@ejguan ejguan requested a review from NivekT March 15, 2023 22:17
@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

Copy link
Contributor

@NivekT NivekT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

res_queue.cancel_join_thread()
process.daemon = True
process.start()
self._dispatch_process = (process, req_queues, res_queues)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking note: Maybe we should refactor out the dispatching code at some point... It is getting too much

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. We should spin off it and worker launch to a separate file

process_name=f"worker process {worker_id}",
call_on_process_init=call_on_process_init,
)
req_queue.cancel_join_thread()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this allows join_thread to be non-blocking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emmm, not exactly. This would make sure req_queue not being joined by gc when the main process wants to exit. Then, it would guarantee that worker process can still access the TerminationRequest from req_queue rather than any corrupted data from a closed queue

response = self.res_buffers[idx].popleft()
else:
response = self.datapipes[idx].protocol.get_response_next(block=True)
while not self._terminated:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: so we are guaranteeing termination message will be sent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is guarantee if there is a prefetch attached after this DataPipe the worker thread would be able to know it's terminated. If we keep the blocking request here, the worker thread will be stuck at the blocking call even though we have already terminated this DataPipe -> hanging at the end of thread.join() in prefetch.

@@ -29,12 +32,60 @@ def _add_one(x: int) -> int:
dp_parametrize = parametrize("dp", test_dps)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did none of the existing pause tests break? Amazing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots of tests are broken before I make pause blocking for Prefetcher. So, I have to make the change.

@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

1 similar comment
@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D43994511

ejguan added a commit to ejguan/data that referenced this pull request Mar 17, 2023
…efetch (meta-pytorch#1075)

Summary:
Fixes issue about `MPRS.finalize` when `dataloader2.shutdown()` is called

### Changes

- DataLoader2 should always clean up `datapipe_iter` at shutdown
- Guard `MPRS` to finalize once
- Fix the problem of `ConnectionError` when DataLoader early exits
  - This is caused by `queue` is joined when main/worker/dispatching process exits. No more request/response can be passed across processes.
    - Consumer process shouldn't join the `req_queue` at exit to make sure producer process can still access the remaining request. And, consumer will close `req_queue` after clean up to prevent any further request sent to queue.
    - Produce process shouldn't join the `res_queue` at exit to make sure consumer process can still access response. And, producer will close `res_queue` after clean up to prevent any further response sent to queue.
       - Main (Consumer) <-> Worker (Producer)
       - Worker (Consumer) -> Dispatching (Producer)
- Fix `pause` API for DataLoader2
    - Invoke `pause` lazily until the `limit+1` iteration is reached to align with python's iterator behavior.
    - Make `prefetch.pause` blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from `iter`.
- Add tests to validate

Pull Request resolved: meta-pytorch#1075

Reviewed By: NivekT

Differential Revision: D43994511

Pulled By: ejguan

fbshipit-source-id: 51dbecd5f68f6e34987235ba8f00461d3b0f4431
…efetch (meta-pytorch#1075)

Summary:
Fixes issue about `MPRS.finalize` when `dataloader2.shutdown()` is called

### Changes

- DataLoader2 should always clean up `datapipe_iter` at shutdown
- Guard `MPRS` to finalize once
- Fix the problem of `ConnectionError` when DataLoader early exits
  - This is caused by `queue` is joined when main/worker/dispatching process exits. No more request/response can be passed across processes.
    - Consumer process shouldn't join the `req_queue` at exit to make sure producer process can still access the remaining request. And, consumer will close `req_queue` after clean up to prevent any further request sent to queue.
    - Produce process shouldn't join the `res_queue` at exit to make sure consumer process can still access response. And, producer will close `res_queue` after clean up to prevent any further response sent to queue.
       - Main (Consumer) <-> Worker (Producer)
       - Worker (Consumer) -> Dispatching (Producer)
- Fix `pause` API for DataLoader2
    - Invoke `pause` lazily until the `limit+1` iteration is reached to align with python's iterator behavior.
    - Make `prefetch.pause` blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from `iter`.
- Add tests to validate

Pull Request resolved: meta-pytorch#1075

Reviewed By: NivekT

Differential Revision: D43994511

Pulled By: ejguan

fbshipit-source-id: db077e09f62b96de2fff71635c46dc42c839c9ec
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D43994511

@facebook-github-bot
Copy link
Contributor

@ejguan merged this pull request in 8eecb77.

@facebook-github-bot
Copy link
Contributor

This pull request has been reverted by 7d97475.

ejguan added a commit to ejguan/data that referenced this pull request Mar 17, 2023
…efetch (meta-pytorch#1075)

Summary:
Fixes issue about `MPRS.finalize` when `dataloader2.shutdown()` is called

### Changes

- DataLoader2 should always clean up `datapipe_iter` at shutdown
- Guard `MPRS` to finalize once
- Fix the problem of `ConnectionError` when DataLoader early exits
  - This is caused by `queue` is joined when main/worker/dispatching process exits. No more request/response can be passed across processes.
    - Consumer process shouldn't join the `req_queue` at exit to make sure producer process can still access the remaining request. And, consumer will close `req_queue` after clean up to prevent any further request sent to queue.
    - Produce process shouldn't join the `res_queue` at exit to make sure consumer process can still access response. And, producer will close `res_queue` after clean up to prevent any further response sent to queue.
       - Main (Consumer) <-> Worker (Producer)
       - Worker (Consumer) -> Dispatching (Producer)
- Fix `pause` API for DataLoader2
    - Invoke `pause` lazily until the `limit+1` iteration is reached to align with python's iterator behavior.
    - Make `prefetch.pause` blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from `iter`.
- Add tests to validate

Pull Request resolved: meta-pytorch#1075

Differential Revision: D44168655

Pulled By: ejguan

fbshipit-source-id: 4fa8c34438414cad7f46527355eb0d51866e6b15
facebook-github-bot pushed a commit that referenced this pull request Mar 18, 2023
…efetch (#1096)

Summary:
Pull Request resolved: #1096

Fixes issue about `MPRS.finalize` when `dataloader2.shutdown()` is called

### Changes

- DataLoader2 should always clean up `datapipe_iter` at shutdown
- Guard `MPRS` to finalize once
- Fix the problem of `ConnectionError` when DataLoader early exits
  - This is caused by `queue` is joined when main/worker/dispatching process exits. No more request/response can be passed across processes.
    - Consumer process shouldn't join the `req_queue` at exit to make sure producer process can still access the remaining request. And, consumer will close `req_queue` after clean up to prevent any further request sent to queue.
    - Produce process shouldn't join the `res_queue` at exit to make sure consumer process can still access response. And, producer will close `res_queue` after clean up to prevent any further response sent to queue.
       - Main (Consumer) <-> Worker (Producer)
       - Worker (Consumer) -> Dispatching (Producer)
- Fix `pause` API for DataLoader2
    - Invoke `pause` lazily until the `limit+1` iteration is reached to align with python's iterator behavior.
    - Make `prefetch.pause` blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from `iter`.
- Add tests to validate

Pull Request resolved: #1075

Reviewed By: NivekT

Differential Revision: D44168655

Pulled By: ejguan

fbshipit-source-id: fdfee5c27b512b5c0d5308e53a81b1cb2db70a43
NivekT pushed a commit that referenced this pull request Apr 19, 2023
…efetch (#1096)

Summary:
Pull Request resolved: #1096

Fixes issue about `MPRS.finalize` when `dataloader2.shutdown()` is called

### Changes

- DataLoader2 should always clean up `datapipe_iter` at shutdown
- Guard `MPRS` to finalize once
- Fix the problem of `ConnectionError` when DataLoader early exits
  - This is caused by `queue` is joined when main/worker/dispatching process exits. No more request/response can be passed across processes.
    - Consumer process shouldn't join the `req_queue` at exit to make sure producer process can still access the remaining request. And, consumer will close `req_queue` after clean up to prevent any further request sent to queue.
    - Produce process shouldn't join the `res_queue` at exit to make sure consumer process can still access response. And, producer will close `res_queue` after clean up to prevent any further response sent to queue.
       - Main (Consumer) <-> Worker (Producer)
       - Worker (Consumer) -> Dispatching (Producer)
- Fix `pause` API for DataLoader2
    - Invoke `pause` lazily until the `limit+1` iteration is reached to align with python's iterator behavior.
    - Make `prefetch.pause` blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from `iter`.
- Add tests to validate

Pull Request resolved: #1075

Reviewed By: NivekT

Differential Revision: D44168655

Pulled By: ejguan

fbshipit-source-id: fdfee5c27b512b5c0d5308e53a81b1cb2db70a43
ejguan added a commit that referenced this pull request Apr 20, 2023
…efetch (#1096)

Summary:
Pull Request resolved: #1096

Fixes issue about `MPRS.finalize` when `dataloader2.shutdown()` is called

### Changes

- DataLoader2 should always clean up `datapipe_iter` at shutdown
- Guard `MPRS` to finalize once
- Fix the problem of `ConnectionError` when DataLoader early exits
  - This is caused by `queue` is joined when main/worker/dispatching process exits. No more request/response can be passed across processes.
    - Consumer process shouldn't join the `req_queue` at exit to make sure producer process can still access the remaining request. And, consumer will close `req_queue` after clean up to prevent any further request sent to queue.
    - Produce process shouldn't join the `res_queue` at exit to make sure consumer process can still access response. And, producer will close `res_queue` after clean up to prevent any further response sent to queue.
       - Main (Consumer) <-> Worker (Producer)
       - Worker (Consumer) -> Dispatching (Producer)
- Fix `pause` API for DataLoader2
    - Invoke `pause` lazily until the `limit+1` iteration is reached to align with python's iterator behavior.
    - Make `prefetch.pause` blocking unless there might be potential racing issue. Main thread is paused but prefetch worker is still trying to fetch data from `iter`.
- Add tests to validate

Pull Request resolved: #1075

Reviewed By: NivekT

Differential Revision: D44168655

Pulled By: ejguan

fbshipit-source-id: fdfee5c27b512b5c0d5308e53a81b1cb2db70a43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged Reverted

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants