Refactor data ingest and read_csv#1116
Conversation
|
I've added compression through |
dask/dataframe/tests/test_csv.py
Outdated
| files2 = valmap(compress, files) | ||
| with filetexts(files2, mode='b'): | ||
| df = read_csv('2014-01-*.csv', compression=fmt) | ||
| eq(df, expected, check_dtype=False) |
There was a problem hiding this comment.
@wabu can you try this implementation against your dataset and compare performance? I would actually be pretty interested in the result of profiling. I might do this in the following way.
import dask
from dask.dataframe.csv import read_csv
df = read_csv(path, compression='xz')I would do this in the IPython console and then use the %prun magic
%prun df.some_column.sum().compute(get=dask.get) # single threaded scheduler|
It looks like |
|
@martindurant this could use your review if you have time sometime next week. This is an attempt to refactor all of the location/compression/format work we prototyped in |
748079f to
985d6d2
Compare
|
I've now updated this to support |
|
Playing with a small xz file In [1]: from dask.bag.text import read_text
In [2]: %time read_text('trip_data_1_00.csv.xz').count().compute() # single core streams through file sequentialy
CPU times: user 25.7 ms, sys: 17.2 ms, total: 42.9 ms
Wall time: 2.47 s
Out[2]: 1000000
In [3]: %time read_text('trip_data_1_00.csv.xz', blocksize=1000000).count().compute() # multiple cores take random access chunks off one at a time
CPU times: user 61.9 ms, sys: 25.9 ms, total: 87.8 ms
Wall time: 4.15 s
Out[3]: 126523
In [4]: %time read_text('trip_data_1_00.csv.xz', blocksize=10000000).count().compute() # multiple cores take random access chunks off one at a time
CPU times: user 21 ms, sys: 13.1 ms, total: 34 ms
Wall time: 1.41 s
Out[4]: 180746
In [5]: !du trip_data_1_00.csv.xz
20328 trip_data_1_00.csv.xz
In [6]: !du trip_data_1_00.csv.xz -hs
20M trip_data_1_00.csv.xzPresumably this would work better for a smaller xz blocksize to dask.bag blocksize ratio |
|
Hrm, noting now that I'm getting different outputs, which is concerning. |
|
Ah, I was handling |
|
@wabu do you have a dataset I can play with or a nice way to produce a similar dataset? Also, are you confident that the python lzma module supports random access? Do you know of ways to produce xz files with small blocks using the command line? |
|
@updiversity regarding #1115 can you try the solution in this branch? import dask.bag as db
from dask.bag.text import read_text
b = read_text('s3://bucket/keys.*.txt', **s3_params)
b.take(0) |
For this I think you need the lzmaffi module (as seen in this comment: #1096 (comment)) |
|
Successfully replaced both |
|
Getting some odd errors in S3 tests like this: https://travis-ci.org/dask/dask/jobs/125713226. Some possible causes:
CCing @martindurant |
|
Yeah, something fishy is definitely going on. (Pdb) pp compute(values[0][0])
('{"amount": 100, "name": "Alice"}\n{"amount": 200, "name": "Bob"}\n{"amount": 300, "name": "Charlie"}\n{"amount": 400, "name": "Dennis"}\n',)
(Pdb) pp compute(values[1][0])
('{"amount": 500, "name": "Alice"}\n{"amount": 600, "name": "Bob"}\n{"amount": 700, "name": "Charlie"}\n{"amount": 800, "name": "Dennis"}\n',)
(Pdb) pp compute(values[0][0], values[1][0])
('{"amount": 500, "name": "Alice"}\n{"amount": 600, "name": "Bob"}\n{"amount": 700, "name": "Charlie"}\n{"amount": 800, "name": "Dennis"}\n',
'{"amount": 500, "name": "Alice"}\n{"amount": 600, "name": "Bob"}\n{"amount": 700, "name": "Charlie"}\n{"amount": 800, "name": "Dennis"}\n')(Pdb) pp values[0][0].dask
{'read_block_from_s3-dee321a7-acb2-4fbb-a142-03cf788ab0a8': (<function read_block_from_s3 at 0x7f6cfd5c6c08>,
'test/test/accounts.1.json',
0,
134217728,
(<type 'dict'>,
[]),
None,
None)}
(Pdb) pp values[1][0].dask
{'read_block_from_s3-4aca7fe2-75d1-432a-ad82-be2eed056a19': (<function read_block_from_s3 at 0x7f6cfd5c6c08>,
'test/test/accounts.2.json',
0,
134217728,
(<type 'dict'>,
[]),
None,
None)} |
|
Turns out moto isn't threadsafe. getmoto/moto#313 Switching to single thread. All is well. Removing the WIP label |
Using the |
|
|
||
| def read_text(path, blocksize=None, compression='infer', | ||
| encoding=system_encoding, errors='strict', | ||
| linedelimiter=os.linesep, collection=True, **kwargs): |
There was a problem hiding this comment.
Isn't '\n' more likely to be the line delimiter for any non-local file?
There was a problem hiding this comment.
That's a good point. Maybe we keep this as None and let the bytes backends use their own defaults.
|
Alright, merging. |
We refactor how data gets ingested from different sources.
Locations
Systems like local, s3, and hdfs (will be in
distributedrepo) implement functionsread_bytes,open_filesand, if available,open_text_files.There is a
bytes.core.{read_bytes,open_files,open_text_files}function that intelligently dispatches to the correct location based on protocol.dd.read_csv('s3://bucket/myfiles.*.csv')correctly dispatches tobytes.s3.read_bytes.Compression
We have a variety of compression formats filled out and rely heavily on File-like objects found in those libraries. If such a file-like object is registered then all of the
read_bytesandopen_filesfunctions support that compression automatically. If that file object is seekable then we can useblocksize=options to split data. Again, this all happens automatically. There is no need for downstream code to care about itFormats
We've hooked this in to back both
dd.read_csvanddb.read_textwith good success. They get extra locations and compression for free.cc @jcrist cc @wabu I'd love review on this before I move forward much more. @wabu in particular if you can tear holes in this I'd love to know sooner rather than later.