-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-15410: [C++][Datasets] Improve memory usage of datasets API when scanning parquet #12228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-15410: [C++][Datasets] Improve memory usage of datasets API when scanning parquet #12228
Conversation
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These generally look like good changes. I wonder how we can test it, perhaps looking at the memory pool?
bef26ed to
9bda522
Compare
|
@lidavidm Unfortunately, when adding tests, I ran into a few issues and so this PR has evolved quite a bit from the last time you reviewed it. It probably needs a fresh look at this point. |
|
I've created ARROW-16263 for the documentation tasks. |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working this out. I left some small comments but overall this looks pretty straightforward and doesn't involve complex generators which is a plus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep the EVENT calls at least?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really a fan of these if we're not actually doing anything (or just forwarding the signal) but I can put it back in. In general I think EVENT is more meaningful when we've made some kind of decision based on data (e.g. when we decide we need to apply backpressure) or an external signal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's fair. We can leave it out.
cpp/src/arrow/compute/exec/options.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems weird to have ptr-to-shared-ptr, vs. just a shared_ptr that might hold nullptr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is because it's an out pointer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an out pointer that we want to share ownership. I was a little torn on this as we could use a raw pointer but then it wouldn't be valid to call any of the monitor methods after the plan is destroyed. I think that is probably ok in general but generator will actually keep the queue alive beyond the plan (std::function is essentially a shared pointer to its target) and so I wanted to match the lifetime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, since the generator is now capturing this I rescind my previous statement. We could change the generator to also capture both the generator AND the backpressure reservoir (so it doesn't have to capture this) but I think that starts to complicate things. I think we want to say:
All inputs and outputs to a plan become invalid when the plan is destroyed
This is a slight change in behavior so I added a guard to the AsyncGenerator so it should now be obvious to callers if the generator outlives the plan.
With this change I went ahead and changed backpressure_monitor to BackpressureMonitor**
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, but maybe namespace the span attribute name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to backpressure.counter. Is this what you were thinking? It doesn't seem we are doing much namespacing elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That works, I was thinking arrow.... like in the memory PR but it's not a big deal.
|
Looks like Raúl is still working through the Git failures, but we can rebase and merge after that. |
…t datasets without using an extraordinary amount of RAM
…gnal based and away from the old async toggle approach
…or after the sink node has been destroyed
46defaa to
777b396
Compare
|
Hmm....doing some more testing on this I think this might be non-ideal in a few situations (S3, low number of files, smallish row groups). This is because we are always reading only 1 row group ahead so we will read, at most, 2 reads in parallel for a single file. However, the old behavior was also unmaintainable as it would have kicked off dozens of parallel reads and run out of memory. The ideal approach would be to keep track of how many rows we have "in flight" and issue reads until we have batch_size * batch_readahead rows in flight and then pause. I'm going to work on this but, as we are close to release, I'd prefer to move forward with this sometimes-slower-but-usually-safer approach and put the ideal fix in a follow-up. |
|
Is there a quick config change we can document for S3? |
|
Just so that regressions can be easily fixed |
|
That said you should feel free to merge here in that case, we should prefer not crashing/freezing |
|
There's no config that we can easily change that would go back to the old behavior. |
|
Hmm, okay. Well, low number of files + smallish row groups should hopefully not be too big a deal then. It might be worth calling this out in the release notes come time just so that it's clear we're working on this. |
|
Ok. I'm going to merge this so we get some nightly tests, etc. run on it and then I'm going to see if I can get a PR up for the ideal behavior pretty quickly. |
|
@westonpace This breaks a GLib CI on macOS: https://github.com/apache/arrow/runs/6136118707?check_suite_focus=true#step:9:128 Could you confirm this? (Is |
|
Sorry Kou. Thanks David! |
|
Benchmark runs are scheduled for baseline = 6c1a160 and contender = 78fb2ed. 78fb2ed is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
|
['Python', 'R'] benchmarks have high level of regressions. |
This PR changes a few things.
counterargument to the calls to help deal with the challenges that arise when we try and sequence backpressure signals. Partly this was to add support for monitoring backpressure (for tests). Partly it is because I have since become more aware of the reasons for these signals. They are needed to allow for backpressure from the aggregate & join nodes.Putting this altogether the scanner should now buffer in memory:
MAX(fragment_readahead * row_group_size_bytes * 2, fragment_readahead * batch_readahead * batch_size_bytes)
The exec plan sink node should buffer ~ kDefaultBackpressureHighBytes bytes.
The exec plan itself can have some number of tasks in flight but, assuming there are no pipeline breakers, this will be limited to the number of threads in the CPU thread pool and so it should be parallelism * batch_size_bytes.
Adding those together should give the total RAM usage of a plan being read via a sink node that doesn't have any pipeline breakers.
When the sink is a write node then there is a separate backpressure consideration based on # of rows (we can someday change this to be # of bytes but it would be a bit tricky at the moment because we need to balance this with the other write parameters like min_rows_per_group).
So, given the parquet dataset mentioned in the JIRA (21 files, 10 million rows each, 10 row groups each) and knowing that 1 row group is ~140MB when decompressed into Arrow format we should get the following default memory usage:
Scanner readahead = MAX(4 * 140MB * 2, 4 * 16 * 17.5MB) = MAX(1120MB, 1120MB) = 1120MB
Sink readahead ~ 1GiB
Total RAM usage should then be ~2GiB.
Update docs to mention that S3 users may want to increase the fragment readahead but this will come at the cost of more RAM usage.Update docs to give some of this "expected memory usage" information