Skip to content

Reducing read_metadata output size in pyarrow/parquet#5391

Merged
mrocklin merged 7 commits intodask:masterfrom
rjzamora:pieces-fix
Sep 25, 2019
Merged

Reducing read_metadata output size in pyarrow/parquet#5391
mrocklin merged 7 commits intodask:masterfrom
rjzamora:pieces-fix

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Sep 10, 2019

This is possible fix for the large-dask-graph problem raised in #5357. We avoid passing around a pq.ParquetDataset "piece", and instead pass around the path and partition_keys members of each piece. This dramitically reduces the amount of metadata stored in the task graph.

Since this solution does require the metadata to be parsed by each task in read_partition, I am including the results of a simple benchmark (performed on my local machine):

import dask
import dask.dataframe as dd
import cloudpickle

ddf = dask.datasets.timeseries(...)
ddf.to_parquet(file, engine="pyarrow")

read_df = dd.read_parquet(file, engine="pyarrow")
fun = read_df.__dask_graph__().values()[0]
print("Graph Size:", len(cloudpickle.dumps(fun)))
%timeit read_df.compute()

OUTPUT...

OLD ~365 partitions:

Graph Size: 206328
5.71 s ± 22.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

NEW ~365 partitions:

Graph Size: 2022
5.74 s ± 101 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

OLD ~36k partitions:

Graph Size: 2144624
16.5 s ± 648 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

NEW ~36k partitions:

Graph Size: 2022
16 s ± 1.24 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
  • Tests added / passed
  • Passes black dask / flake8 dask

@mrocklin
Copy link
Member

Since this solution does require the metadata to be parsed by each task in read_partition, I am including the results of a simple benchmark (performed on my local machine):

@rjzamora if you're willing to humor me, can you try one of these benchmarks also with the distributed scheduler? To do this you would add the following line of code sometime before you call .compute()

from dask.distributed import Client
client = Client()

@mrocklin
Copy link
Member

I would expect this to accentuate the slowdown of moving large graphs around.

@mrocklin
Copy link
Member

@birdsarah , this might help out some of your workloads (if you're still interested in engaging here). I suspect that this might have been slowing you down on many-partition workloads.

@rjzamora
Copy link
Member Author

if you're willing to humor me, can you try one of these benchmarks also with the distributed scheduler? To do this you would add the following line of code sometime before you call .compute()

Absolutely - Sorry, should have done this the first time around :)

@rjzamora
Copy link
Member Author

dask.distributed version of the benchmark above...

OLD ~365 partitions:

Graph Size: 206328
9.63 s ± 139 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

NEW ~365 partitions:

Graph Size: 2022
8.54 s ± 191 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

For dask.distributed, the ~36k-partition example takes ~13.6s for the new code, but does not work for the "old" code. I get the following output:

Click to Expand
``` /Users/rzamora/.local/lib/python3.7/site-packages/distributed/worker.py:2794: UserWarning: Large object of size 2.14 MB detected in task graph: (future = client.submit(func, big_data) # bad big_future = client.scatter(big_data) # good future = client.submit(func, big_future) # good

% (format_bytes(len(b)), s))
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:56387, threads: 3>>
Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: 'Process' object has no attribute '_cache'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: _cache

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 359, in catch_zombie
yield
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/Users/rzamora/.local/lib/python3.7/site-packages/distributed/nanny.py", line 266, in memory_monitor
memory = proc.memory_info().rss
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/init.py", line 1170, in memory_info
return self._proc.memory_info()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 469, in memory_info
rawtuple = self._get_pidtaskinfo()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/contextlib.py", line 130, in exit
self.gen.throw(type, value, traceback)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 372, in catch_zombie
raise AccessDenied(proc.pid, proc._name)
psutil.AccessDenied: psutil.AccessDenied (pid=6545)
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:56387, threads: 3>>
Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: 'Process' object has no attribute '_cache'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: _cache

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 359, in catch_zombie
yield
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/Users/rzamora/.local/lib/python3.7/site-packages/distributed/nanny.py", line 266, in memory_monitor
memory = proc.memory_info().rss
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/init.py", line 1170, in memory_info
return self._proc.memory_info()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 469, in memory_info
rawtuple = self._get_pidtaskinfo()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/contextlib.py", line 130, in exit
self.gen.throw(type, value, traceback)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 372, in catch_zombie
raise AccessDenied(proc.pid, proc._name)
psutil.AccessDenied: psutil.AccessDenied (pid=6545)
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:56387, threads: 3>>
Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: 'Process' object has no attribute '_cache'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: _cache

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 359, in catch_zombie
yield
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/Users/rzamora/.local/lib/python3.7/site-packages/distributed/nanny.py", line 266, in memory_monitor
memory = proc.memory_info().rss
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/init.py", line 1170, in memory_info
return self._proc.memory_info()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 469, in memory_info
rawtuple = self._get_pidtaskinfo()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/contextlib.py", line 130, in exit
self.gen.throw(type, value, traceback)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 372, in catch_zombie
raise AccessDenied(proc.pid, proc._name)
psutil.AccessDenied: psutil.AccessDenied (pid=6545)
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:56387, threads: 3>>
Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: 'Process' object has no attribute '_cache'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: _cache

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 359, in catch_zombie
yield
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/Users/rzamora/.local/lib/python3.7/site-packages/distributed/nanny.py", line 266, in memory_monitor
memory = proc.memory_info().rss
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/init.py", line 1170, in memory_info
return self._proc.memory_info()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 469, in memory_info
rawtuple = self._get_pidtaskinfo()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/contextlib.py", line 130, in exit
self.gen.throw(type, value, traceback)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 372, in catch_zombie
raise AccessDenied(proc.pid, proc._name)
psutil.AccessDenied: psutil.AccessDenied (pid=6545)
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:56387, threads: 3>>
Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: 'Process' object has no attribute '_cache'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: _cache

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 359, in catch_zombie
yield
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/Users/rzamora/.local/lib/python3.7/site-packages/distributed/nanny.py", line 266, in memory_monitor
memory = proc.memory_info().rss
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/init.py", line 1170, in memory_info
return self._proc.memory_info()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 469, in memory_info
rawtuple = self._get_pidtaskinfo()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/contextlib.py", line 130, in exit
self.gen.throw(type, value, traceback)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 372, in catch_zombie
raise AccessDenied(proc.pid, proc._name)
psutil.AccessDenied: psutil.AccessDenied (pid=6545)
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:56387, threads: 3>>
Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: 'Process' object has no attribute '_cache'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: _cache

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 359, in catch_zombie
yield
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/Users/rzamora/.local/lib/python3.7/site-packages/distributed/nanny.py", line 266, in memory_monitor
memory = proc.memory_info().rss
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/init.py", line 1170, in memory_info
return self._proc.memory_info()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 469, in memory_info
rawtuple = self._get_pidtaskinfo()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/contextlib.py", line 130, in exit
self.gen.throw(type, value, traceback)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 372, in catch_zombie
raise AccessDenied(proc.pid, proc._name)
psutil.AccessDenied: psutil.AccessDenied (pid=6545)
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:56387, threads: 3>>
Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: 'Process' object has no attribute '_cache'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: _cache

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 359, in catch_zombie
yield
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/Users/rzamora/.local/lib/python3.7/site-packages/distributed/nanny.py", line 266, in memory_monitor
memory = proc.memory_info().rss
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/init.py", line 1170, in memory_info
return self._proc.memory_info()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 469, in memory_info
rawtuple = self._get_pidtaskinfo()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/contextlib.py", line 130, in exit
self.gen.throw(type, value, traceback)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 372, in catch_zombie
raise AccessDenied(proc.pid, proc._name)
psutil.AccessDenied: psutil.AccessDenied (pid=6545)
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:56387, threads: 3>>
Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: 'Process' object has no attribute '_cache'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: _cache

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 359, in catch_zombie
yield
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/Users/rzamora/.local/lib/python3.7/site-packages/distributed/nanny.py", line 266, in memory_monitor
memory = proc.memory_info().rss
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/init.py", line 1170, in memory_info
return self._proc.memory_info()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 469, in memory_info
rawtuple = self._get_pidtaskinfo()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/contextlib.py", line 130, in exit
self.gen.throw(type, value, traceback)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 372, in catch_zombie
raise AccessDenied(proc.pid, proc._name)
psutil.AccessDenied: psutil.AccessDenied (pid=6545)
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:56387, threads: 3>>
Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: 'Process' object has no attribute '_cache'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 342, in wrapper
ret = self._cache[fun]
AttributeError: _cache

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 359, in catch_zombie
yield
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/Users/rzamora/.local/lib/python3.7/site-packages/distributed/nanny.py", line 266, in memory_monitor
memory = proc.memory_info().rss
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/init.py", line 1170, in memory_info
return self._proc.memory_info()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 469, in memory_info
rawtuple = self._get_pidtaskinfo()
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 339, in wrapper
return fun(self, *args, **kwargs)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_common.py", line 345, in wrapper
return fun(self)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 400, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/contextlib.py", line 130, in exit
self.gen.throw(type, value, traceback)
File "/anaconda3/envs/dask-parquet-dev/lib/python3.7/site-packages/psutil/_psosx.py", line 372, in catch_zombie
raise AccessDenied(proc.pid, proc._name)
psutil.AccessDenied: psutil.AccessDenied (pid=6545)
distributed.nanny - WARNING - Worker process 6545 was killed by signal 15

</details>


@mrocklin
Copy link
Member

Woot! Nice result.

@mrocklin
Copy link
Member

Thoughts on testing? Maybe we could test that the size of the serialized graph was small like the following:

df = ... # some dataframe with lots of columns
df.to_parquet(tmppath)
df = dd.read_parquet(tmppath)
assert len(pickle.dumps(df.__dask_graph__())) < 10000  # or some other suitable number

@rjzamora
Copy link
Member Author

Update: I revised the fastparquet engine to avoid passing around row-group and ParquetFile objects in the task graph (when possible). This was a bit trickier to do in fastparquet - Not sure if I implemented the best solution here.

@birdsarah
Copy link
Contributor

@rjzamora I'm testing out this PR. The failing test makes me nervous. Can you contextualize?

@birdsarah
Copy link
Contributor

Well, turns out I can only offer quick feedback as I can't actually use this branch due to s3fs problems but I think this solves my large partitions with metadata problem. Without this branch on 2.4.0 I'm seeing "large item in task graph" and un-ending wait time for job to start. With it, work starts in a timely manner.

Small piece of feedback, reading is still slower though. In this particular case I was reading 33 parquet files with a '00 to '000 of partitions. Repartitioning to smaller number and writing out.

To do the read of 33 separate days and concat step on dask 2.1.0 takes ~40s on dask 2.4.0 takes ~1min 30 (sped up from ~1min 40 to ~1min 25 moving from dask 2.4.0 to this branch but that might be a coincidence not this branch). Not the end of the world and with potential other performance savings it's liveable with, just an observation.

@rjzamora
Copy link
Member Author

@birdsarah - Are you testing this PR with the fastparquet or pyarrow engine? Since the original issue was for pyarrow, and the fastparquet changes are causing CI failures, I was planning to revert the fastparquet changes and revisit them later (if needed).

If the fastparquet changes are useful to you, I will do my best to address the CI problem (and prehaps the performance problem) here.

@birdsarah
Copy link
Contributor

birdsarah commented Sep 18, 2019

I tested on fastparquet. And that's what I'm using for all my data. Unfortunately pyarrow did not support the reading in of complex datatypes at the start of my pipeline so I'm fastparquet all the way.

Do not rush on my account. The fsspec/s3fs problems mean that anything after dask 2.1.0 is completely unusable for me. Last night I was hoping that fsspec/s3fs#237 would have solved the s3fs problems but it hasn't. But without this PR dask after 2.1.0 is also unusable for me when a data source has metadata but also large n partitions. Ironically if there isn't metadata, which was my problem before, 2.4.0 is now speedy - thanks to gather_statistics=False.

@birdsarah
Copy link
Contributor

I take it back @rjzamora: do rush on my account!! I will shortly be done writing large volumes of data to s3, which means I should be able to use latest dask for analysis and, therefore upgrade. Let me know what / when you'd like me to test.

@rjzamora
Copy link
Member Author

@birdsarah - I believe the CI problem has been addressed. Unfortunatley, I am not sure how to test the performance regression from 2.1.0 to this branch. I am personally finding a degredation in performance when I move to 2.1.0 -- Can you provide a simple tests case?

Note that I just tried:

ddf = dask.datasets.timeseries(
    start = "2000-01-01",
    end = "2000-12-31",
    freq = "1S",
    partition_freq="1D",
    seed = 42,
    id_lam=30,
)

Reading this dataset changed from ~1:06 to ~1:50 when I swapped out this branch for 2.1.0

@birdsarah
Copy link
Contributor

Can you provide a simple tests case?

No. Sorry. I can only report what I see on my cluster when switching between environments. There are so many moving pieces between switching from s3fs=0.2.0 and dask=2.1.0 (env a) and s3fs and dask latest (env b). To provide a rigorous isolated test case is something I simply don't have time for especially given the limited value. I appreciate you trying to find one. The fact that we are seeing performance differences in the opposite direction doesn't entirely surprise me and these performance differences don't appear to be deal breakers (as compared to the fact that dask 2.4.0 can't read large datasets with _metadata right now).

@birdsarah
Copy link
Contributor

I believe the CI problem has been addressed.

Woot :D

@rjzamora
Copy link
Member Author

@martindurant - Any chance you can take a look at this?

Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks generally nice. Will give @martindurant a chance to look.

@TomAugspurger TomAugspurger changed the title [WIP] Reducing read_metadata output size in pyarrow/parquet Reducing read_metadata output size in pyarrow/parquet Sep 19, 2019
@mrocklin
Copy link
Member

@martindurant last chance :)

Merging this afternoon if there are no further comments

@martindurant
Copy link
Member

Ona quick glance, looks very reasonable. I left two comments.

@rjzamora
Copy link
Member Author

Thanks for reviewing @martindurant! Hopefully both of your suggestions have been addressed now

@martindurant
Copy link
Member

I didn't look for long, but I think things are good.

@mrocklin mrocklin merged commit 6dfc8e9 into dask:master Sep 25, 2019
@mrocklin
Copy link
Member

Merging this in. Thanks @rjzamora for fixing this and @birdsarah @martindurant for review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants