Skip to content

Implementing thread based PrefetcherIterDataPipe#770

Closed
VitalyFedyunin wants to merge 8 commits intogh/VitalyFedyunin/21/basefrom
gh/VitalyFedyunin/21/head
Closed

Implementing thread based PrefetcherIterDataPipe#770
VitalyFedyunin wants to merge 8 commits intogh/VitalyFedyunin/21/basefrom
gh/VitalyFedyunin/21/head

Conversation

@VitalyFedyunin
Copy link
Contributor

@VitalyFedyunin VitalyFedyunin commented Sep 9, 2022

VitalyFedyunin added a commit that referenced this pull request Sep 9, 2022
ghstack-source-id: 30bea32
Pull Request resolved: #770
@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Sep 9, 2022
VitalyFedyunin added a commit that referenced this pull request Sep 9, 2022
ghstack-source-id: 25bee73
Pull Request resolved: #770
VitalyFedyunin added a commit that referenced this pull request Sep 9, 2022
ghstack-source-id: 635a985
Pull Request resolved: #770
Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments about threading below.

time.sleep(PRODUCER_SLEEP_INTERVAL)

def __iter__(self):
self.reset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: reset can be omitted.

Comment on lines +24 to +25
# TODO: Potential optimization is changing buffer from list to dequeue
self.prefetch_buffer = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I agree changing to deque because it's thread safe.


class _PrefetchData:
def __init__(self, source_datapipe, buffer_size):
self.run_prefetcher = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, should we add a thread lock around run_prefetcher?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my thread lock is GIL =)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol. Kind make sense.

Comment on lines +48 to +51
except communication.iter.InvalidStateResetRequired:
stop_iteration = True
except communication.iter.TerminateRequired:
prefetch_data.run_prefetcher = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle those two exceptions? communication is more or less a sub module for MultiprocessingReadingService. I personally feel better to remove those exceptions from a DataPipe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to it here, because it is a separate thread that needs to be terminated nicely in case of source Datapipe is out of commission.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. This prefetch is not only attached to the main problem but also to child processes.

VitalyFedyunin added a commit that referenced this pull request Sep 21, 2022
ghstack-source-id: 4fc437e
Pull Request resolved: #770
VitalyFedyunin added a commit that referenced this pull request Sep 21, 2022
ghstack-source-id: dd95cfd
Pull Request resolved: #770
VitalyFedyunin added a commit that referenced this pull request Sep 21, 2022
ghstack-source-id: 5fdc9ba
Pull Request resolved: #770
Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
I assume there will be test added after the corresponding changes are made into PrototypeMPRS.

Copy link
Contributor

@NivekT NivekT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we intend for this DataPipe to be user-facing? (I'm guessing not?) If it is, adding a docstring and adding this to torchdata.datapipes.iter.rst will be good.

Comment on lines +104 to +105
def reset_iterator(self):
self.reset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What is the expected behavior of reset_iterator in PrototypeRS? How is that different from the usual DataPipe reset?

Comment on lines +61 to +62
if self.buffer_size < 1:
yield from self.source_datapipe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case should not be possible because of the check in __init__

Comment on lines +65 to +66
prefetch_data = _PrefetchData(self.source_datapipe, self.buffer_size)
self.prefetch_data = prefetch_data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Rename these to prefetcher_thread_worker or something?

@VitalyFedyunin
Copy link
Contributor Author

@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

VitalyFedyunin added a commit that referenced this pull request Sep 26, 2022
ghstack-source-id: 0bbf6ff
Pull Request resolved: #770
@VitalyFedyunin
Copy link
Contributor Author

@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

VitalyFedyunin added a commit that referenced this pull request Oct 3, 2022
ghstack-source-id: 6b9c54f
Pull Request resolved: #770
@VitalyFedyunin
Copy link
Contributor Author

@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot facebook-github-bot deleted the gh/VitalyFedyunin/21/head branch October 7, 2022 14:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants