[DataPipe] Add as_tuple argument for CSVParserIterDataPipe#646
[DataPipe] Add as_tuple argument for CSVParserIterDataPipe#646ice-tong wants to merge 5 commits intometa-pytorch:mainfrom
Conversation
ejguan
left a comment
There was a problem hiding this comment.
Thank you, LGTM with one nit comment
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
| for data in stream: | ||
| yield path, data | ||
|
|
||
| def as_tuple(self, stream: Iterator[List]) -> Iterator[Union[List, Tuple]]: |
There was a problem hiding this comment.
I just realize we need to change the type hint here. The input stream might not be iterator[List]. We can use Iterator[D] -> Iterator[Union[D, Tuple]].
| if isinstance(data, list): | ||
| yield tuple(data) | ||
| else: | ||
| yield data |
There was a problem hiding this comment.
In terms of performance, we might want to directly yield tuple(data) without checking isinstance every single time when as_tuple is specified as True. In that case, Users should take responsibility to handle the case.
Could you please run benchmarking using these two implementation with a long csv file?
There was a problem hiding this comment.
Ok, I'll do this benchmarking.
There was a problem hiding this comment.
Hi @ejguan, I run the following benchmark:
import os
import timeit
import tempfile
from torchdata.datapipes.iter import IterableWrapper, FileOpener
from torchdata.datapipes.iter.util.plain_text_reader import PlainTextReaderHelper
def impl_a(self, stream):
if not self._as_tuple:
yield from stream
return
for data in stream:
if isinstance(data, list):
yield tuple(data)
else:
yield data
def impl_b(self, stream):
if not self._as_tuple:
yield from stream
return
for data in stream:
yield tuple(data)
fake_line = '1,2,3,4,5,6,7,8,9'
line_num = 100 * 10000
with tempfile.TemporaryDirectory() as temp_dir:
temp_fpath = os.path.join(temp_dir, 'temp.csv')
with open(temp_fpath, 'w') as f:
f.write('\n'.join([fake_line, ] * line_num))
datapipe1 = IterableWrapper([temp_fpath])
datapipe2 = FileOpener(datapipe1, mode="b")
csv_parser_dp = datapipe2.parse_csv(as_tuple=True)
PlainTextReaderHelper.as_tuple = impl_a
print("impl a: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))
PlainTextReaderHelper.as_tuple = impl_b
print("impl b: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))Got:
impl a: 13.500808166
impl b: 13.221415374999998
Any comments about this? ^_^
There was a problem hiding this comment.
Let's run a dummy test for the case that not isinstance(data, list). To test it, you can expose as_tuple to parse_csv_as_dict and turn it on. Thank you
There was a problem hiding this comment.
Hi @ejguan , I added a benchmark for parse_csv_as_dict:
import csv
import os
import timeit
import tempfile
from torchdata.datapipes.iter import IterableWrapper, FileOpener
from torchdata.datapipes.iter.util.plain_text_reader import PlainTextReaderHelper
from torchdata.datapipes.iter.util.plain_text_reader import _CSVBaseParserIterDataPipe
def impl_a(self, stream):
if not self._as_tuple:
yield from stream
return
for data in stream:
if isinstance(data, list):
yield tuple(data)
else:
yield data
def impl_b(self, stream):
if not self._as_tuple:
yield from stream
return
for data in stream:
yield tuple(data)
fake_line = '1,2,3,4,5,6,7,8,9'
line_num = 100 * 10000
with tempfile.TemporaryDirectory() as temp_dir:
temp_fpath = os.path.join(temp_dir, 'temp.csv')
with open(temp_fpath, 'w') as f:
f.write('\n'.join([fake_line, ] * line_num))
datapipe1 = IterableWrapper([temp_fpath])
datapipe2 = FileOpener(datapipe1, mode="b")
csv_parser_dp = datapipe2.parse_csv(as_tuple=True)
PlainTextReaderHelper.as_tuple = impl_a
print("impl a: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))
PlainTextReaderHelper.as_tuple = impl_b
print("impl b: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))
PlainTextReaderHelper.as_tuple = impl_a
csv_dict_parser_dp1 = _CSVBaseParserIterDataPipe(
datapipe2, csv.DictReader, decode=True, as_tuple=True)
csv_dict_parser_dp2 = _CSVBaseParserIterDataPipe(
datapipe2, csv.DictReader, decode=True, as_tuple=False)
print("impl a with dict as_tupe=True: ", timeit.timeit(stmt=lambda: list(csv_dict_parser_dp1), number=10))
print("impl a with dict as_tupe=False: ", timeit.timeit(stmt=lambda: list(csv_dict_parser_dp2), number=10))Got:
impl a: 13.746312875000001
impl b: 13.414058333999998
impl a with dict as_tupe=True: 25.277461583
impl a with dict as_tupe=False: 24.659475541
There was a problem hiding this comment.
How about impl b with dict?
There was a problem hiding this comment.
Hi, there are the result of impl_b with dict parser:
impl a: 13.375169375
impl b: 13.102719625
impl a with dict as_tupe=True: 24.521925125000003
impl a with dict as_tupe=False: 23.991537542000003
impl b with dict as_tupe=True: 24.552794458999998
impl b with dict as_tupe=False: 24.026250667
There was a problem hiding this comment.
Cool. Then, let's leave it as it is.
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
|
||
| # Functional Test: yield one row at time from each file as tuple instead of list, skipping over empty content | ||
| csv_parser_dp = datapipe3.parse_csv(as_tuple=True) | ||
| expected_res = [("key", "item"), ("a", "1"), ("b", "2"), ()] |
There was a problem hiding this comment.
Emmm. It weird that CI doesn't catch the Error. Could you please change the variable name from expected_res to something else?
It breaks another test at line 171
Edit: Actually, it's raised by CI.
There was a problem hiding this comment.
My bad, I didn't notice that expected_res has been reused in another test case. Fixed!
| stream = self._helper.skip_lines(file) | ||
| stream = self._helper.decode(stream) | ||
| stream = self._csv_reader(stream, **self.fmtparams) | ||
| stream = self._helper.as_tuple(stream) |
There was a problem hiding this comment.
mypy complains about it at this line. Could you please try to fix it? Otherwise, you can add a comment at the end of this line to bypass the mypy check with # type: ignore[assignment]
There was a problem hiding this comment.
Done! mypy doesn't suggest variable name reuse in one function body.
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
1 similar comment
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Motivation
see pytorch/torcharrow#421
Changes
as_tupleargument for CSVParserIterDataPipeas_tuplein tests/test_local_io.py