Reducing read_metadata output size in pyarrow/parquet#5391
Reducing read_metadata output size in pyarrow/parquet#5391mrocklin merged 7 commits intodask:masterfrom
Conversation
@rjzamora if you're willing to humor me, can you try one of these benchmarks also with the distributed scheduler? To do this you would add the following line of code sometime before you call from dask.distributed import Client
client = Client() |
|
I would expect this to accentuate the slowdown of moving large graphs around. |
|
@birdsarah , this might help out some of your workloads (if you're still interested in engaging here). I suspect that this might have been slowing you down on many-partition workloads. |
Absolutely - Sorry, should have done this the first time around :) |
|
OLD ~365 partitions: NEW ~365 partitions: For Click to Expand``` /Users/rzamora/.local/lib/python3.7/site-packages/distributed/worker.py:2794: UserWarning: Large object of size 2.14 MB detected in task graph: ( future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s)) During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): |
|
Woot! Nice result. |
|
Thoughts on testing? Maybe we could test that the size of the serialized graph was small like the following: df = ... # some dataframe with lots of columns
df.to_parquet(tmppath)
df = dd.read_parquet(tmppath)
assert len(pickle.dumps(df.__dask_graph__())) < 10000 # or some other suitable number |
|
Update: I revised the fastparquet engine to avoid passing around row-group and |
|
@rjzamora I'm testing out this PR. The failing test makes me nervous. Can you contextualize? |
|
Well, turns out I can only offer quick feedback as I can't actually use this branch due to s3fs problems but I think this solves my large partitions with metadata problem. Without this branch on 2.4.0 I'm seeing "large item in task graph" and un-ending wait time for job to start. With it, work starts in a timely manner. Small piece of feedback, reading is still slower though. In this particular case I was reading 33 parquet files with a '00 to '000 of partitions. Repartitioning to smaller number and writing out. To do the read of 33 separate days and concat step on dask 2.1.0 takes ~40s on dask 2.4.0 takes ~1min 30 (sped up from ~1min 40 to ~1min 25 moving from dask 2.4.0 to this branch but that might be a coincidence not this branch). Not the end of the world and with potential other performance savings it's liveable with, just an observation. |
|
@birdsarah - Are you testing this PR with the fastparquet or pyarrow engine? Since the original issue was for pyarrow, and the fastparquet changes are causing CI failures, I was planning to revert the fastparquet changes and revisit them later (if needed). If the fastparquet changes are useful to you, I will do my best to address the CI problem (and prehaps the performance problem) here. |
|
I tested on fastparquet. And that's what I'm using for all my data. Unfortunately pyarrow did not support the reading in of complex datatypes at the start of my pipeline so I'm fastparquet all the way. Do not rush on my account. The fsspec/s3fs problems mean that anything after dask 2.1.0 is completely unusable for me. Last night I was hoping that fsspec/s3fs#237 would have solved the s3fs problems but it hasn't. But without this PR dask after 2.1.0 is also unusable for me when a data source has metadata but also large n partitions. Ironically if there isn't metadata, which was my problem before, 2.4.0 is now speedy - thanks to |
|
I take it back @rjzamora: do rush on my account!! I will shortly be done writing large volumes of data to s3, which means I should be able to use latest dask for analysis and, therefore upgrade. Let me know what / when you'd like me to test. |
|
@birdsarah - I believe the CI problem has been addressed. Unfortunatley, I am not sure how to test the performance regression from 2.1.0 to this branch. I am personally finding a degredation in performance when I move to 2.1.0 -- Can you provide a simple tests case? Note that I just tried: ddf = dask.datasets.timeseries(
start = "2000-01-01",
end = "2000-12-31",
freq = "1S",
partition_freq="1D",
seed = 42,
id_lam=30,
)Reading this dataset changed from ~1:06 to ~1:50 when I swapped out this branch for 2.1.0 |
No. Sorry. I can only report what I see on my cluster when switching between environments. There are so many moving pieces between switching from s3fs=0.2.0 and dask=2.1.0 (env a) and s3fs and dask latest (env b). To provide a rigorous isolated test case is something I simply don't have time for especially given the limited value. I appreciate you trying to find one. The fact that we are seeing performance differences in the opposite direction doesn't entirely surprise me and these performance differences don't appear to be deal breakers (as compared to the fact that dask 2.4.0 can't read large datasets with _metadata right now). |
Woot :D |
|
@martindurant - Any chance you can take a look at this? |
TomAugspurger
left a comment
There was a problem hiding this comment.
This looks generally nice. Will give @martindurant a chance to look.
|
@martindurant last chance :) Merging this afternoon if there are no further comments |
|
Ona quick glance, looks very reasonable. I left two comments. |
|
Thanks for reviewing @martindurant! Hopefully both of your suggestions have been addressed now |
|
I didn't look for long, but I think things are good. |
|
Merging this in. Thanks @rjzamora for fixing this and @birdsarah @martindurant for review. |
This is possible fix for the large-dask-graph problem raised in #5357. We avoid passing around a
pq.ParquetDataset"piece", and instead pass around thepathandpartition_keysmembers of eachpiece. This dramitically reduces the amount of metadata stored in the task graph.Since this solution does require the metadata to be parsed by each task in
read_partition, I am including the results of a simple benchmark (performed on my local machine):OUTPUT...
OLD ~365 partitions:
NEW ~365 partitions:
OLD ~36k partitions:
NEW ~36k partitions:
black dask/flake8 dask