Skip to content

Conversation

@westonpace
Copy link
Member

@westonpace westonpace commented Jan 22, 2022

This PR changes a few things.

  • The default file readahead is changed to 4. This doesn't seem to affect performance on HDD/SSD and users should already be doing special tuning for S3. Besides, in many cases, users are reading IPC/Parquet files that have many row groups and so we already have sufficient I/O parallelism. This is important for bringing down the overall memory usage as can be seen in the formula below.
  • The default batch readahead is changed to 16. Previously, when we were doing filtering and projection within the scanner, it made sense to read many batches ahead (generally want at least 2 * # of CPUs in that case). Now that the exec plan is doing the computation the exec plan buffering is instead handled by kDefaultBackpressureLowBytes and kDefaultBackpressureHighBytes.
  • Moves around the parquet readahead a bit. The previous version would read ahead N row groups. Now we always read ahead exactly 1 row group but we read ahead N batches (this may mean that we read ahead more than 1 row group if the batch size is much larger than the row group size).
  • Backpressure now utilizes the pause/resume producing signals in the execution plan. I've adding a counter argument to the calls to help deal with the challenges that arise when we try and sequence backpressure signals. Partly this was to add support for monitoring backpressure (for tests). Partly it is because I have since become more aware of the reasons for these signals. They are needed to allow for backpressure from the aggregate & join nodes.
  • Sink backpressure can now be monitored. This makes it easier to test and could be potentially useful to a user that wanted to know when they are consuming the plan too slowly.
  • Changes the default scanner batch size to 128Ki rows. Now that we have more or less decoupled the scanning batch size from the row group size we can pass smaller batches through the scanner. This makes it easier to get parallelism on small datasets..

Putting this altogether the scanner should now buffer in memory:

MAX(fragment_readahead * row_group_size_bytes * 2, fragment_readahead * batch_readahead * batch_size_bytes)

The exec plan sink node should buffer ~ kDefaultBackpressureHighBytes bytes.

The exec plan itself can have some number of tasks in flight but, assuming there are no pipeline breakers, this will be limited to the number of threads in the CPU thread pool and so it should be parallelism * batch_size_bytes.

Adding those together should give the total RAM usage of a plan being read via a sink node that doesn't have any pipeline breakers.

When the sink is a write node then there is a separate backpressure consideration based on # of rows (we can someday change this to be # of bytes but it would be a bit tricky at the moment because we need to balance this with the other write parameters like min_rows_per_group).

So, given the parquet dataset mentioned in the JIRA (21 files, 10 million rows each, 10 row groups each) and knowing that 1 row group is ~140MB when decompressed into Arrow format we should get the following default memory usage:

Scanner readahead = MAX(4 * 140MB * 2, 4 * 16 * 17.5MB) = MAX(1120MB, 1120MB) = 1120MB
Sink readahead ~ 1GiB
Total RAM usage should then be ~2GiB.

  • Add tests to verify memory usage
  • Update docs to mention that S3 users may want to increase the fragment readahead but this will come at the cost of more RAM usage.
  • Update docs to give some of this "expected memory usage" information

@github-actions
Copy link

@westonpace westonpace changed the title ARROW-14510: [C++][Datasets] Improve memory usage of datasets API when scanning parquet ARROW-15410: [C++][Datasets] Improve memory usage of datasets API when scanning parquet Mar 2, 2022
@github-actions
Copy link

github-actions bot commented Mar 2, 2022

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These generally look like good changes. I wonder how we can test it, perhaps looking at the memory pool?

@westonpace westonpace force-pushed the feature/ARROW-15410--improve-dataset-parquet-memory-usage branch 2 times, most recently from bef26ed to 9bda522 Compare April 20, 2022 04:15
@westonpace westonpace marked this pull request as ready for review April 21, 2022 09:35
@westonpace westonpace requested a review from lidavidm April 21, 2022 09:35
@westonpace
Copy link
Member Author

@lidavidm Unfortunately, when adding tests, I ran into a few issues and so this PR has evolved quite a bit from the last time you reviewed it. It probably needs a fresh look at this point.

@westonpace
Copy link
Member Author

I've created ARROW-16263 for the documentation tasks.

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working this out. I left some small comments but overall this looks pretty straightforward and doesn't involve complex generators which is a plus.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the EVENT calls at least?

Copy link
Member Author

@westonpace westonpace Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really a fan of these if we're not actually doing anything (or just forwarding the signal) but I can put it back in. In general I think EVENT is more meaningful when we've made some kind of decision based on data (e.g. when we decide we need to apply backpressure) or an external signal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's fair. We can leave it out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems weird to have ptr-to-shared-ptr, vs. just a shared_ptr that might hold nullptr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is because it's an out pointer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an out pointer that we want to share ownership. I was a little torn on this as we could use a raw pointer but then it wouldn't be valid to call any of the monitor methods after the plan is destroyed. I think that is probably ok in general but generator will actually keep the queue alive beyond the plan (std::function is essentially a shared pointer to its target) and so I wanted to match the lifetime.

Copy link
Member Author

@westonpace westonpace Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, since the generator is now capturing this I rescind my previous statement. We could change the generator to also capture both the generator AND the backpressure reservoir (so it doesn't have to capture this) but I think that starts to complicate things. I think we want to say:

All inputs and outputs to a plan become invalid when the plan is destroyed

This is a slight change in behavior so I added a guard to the AsyncGenerator so it should now be obvious to callers if the generator outlives the plan.

With this change I went ahead and changed backpressure_monitor to BackpressureMonitor**

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, but maybe namespace the span attribute name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to backpressure.counter. Is this what you were thinking? It doesn't seem we are doing much namespacing elsewhere.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works, I was thinking arrow.... like in the memory PR but it's not a big deal.

@lidavidm
Copy link
Member

Looks like Raúl is still working through the Git failures, but we can rebase and merge after that.

@westonpace westonpace force-pushed the feature/ARROW-15410--improve-dataset-parquet-memory-usage branch from 46defaa to 777b396 Compare April 22, 2022 19:36
@westonpace
Copy link
Member Author

@lidavidm

Hmm....doing some more testing on this I think this might be non-ideal in a few situations (S3, low number of files, smallish row groups). This is because we are always reading only 1 row group ahead so we will read, at most, 2 reads in parallel for a single file.

However, the old behavior was also unmaintainable as it would have kicked off dozens of parallel reads and run out of memory.

The ideal approach would be to keep track of how many rows we have "in flight" and issue reads until we have batch_size * batch_readahead rows in flight and then pause. I'm going to work on this but, as we are close to release, I'd prefer to move forward with this sometimes-slower-but-usually-safer approach and put the ideal fix in a follow-up.

@lidavidm
Copy link
Member

Is there a quick config change we can document for S3?

@lidavidm
Copy link
Member

Just so that regressions can be easily fixed

@lidavidm
Copy link
Member

That said you should feel free to merge here in that case, we should prefer not crashing/freezing

@westonpace
Copy link
Member Author

There's no config that we can easily change that would go back to the old behavior.

@lidavidm
Copy link
Member

Hmm, okay.

Well, low number of files + smallish row groups should hopefully not be too big a deal then. It might be worth calling this out in the release notes come time just so that it's clear we're working on this.

@westonpace
Copy link
Member Author

Ok. I'm going to merge this so we get some nightly tests, etc. run on it and then I'm going to see if I can get a PR up for the ideal behavior pretty quickly.

@kou
Copy link
Member

kou commented Apr 23, 2022

@westonpace This breaks a GLib CI on macOS:

https://github.com/apache/arrow/runs/6136118707?check_suite_focus=true#step:9:128

FAILED: arrow-glib/libarrow-glib.800.dylib.p/expression.cpp.o 
ccache c++ -Iarrow-glib/libarrow-glib.800.dylib.p -Iarrow-glib -I../../c_glib/arrow-glib -I. -I../../c_glib -I/usr/local/include -I/usr/local/Cellar/thrift/0.16.0/include -I/usr/local/Cellar/brotli/1.0.9/include -I/usr/local/Cellar/protobuf/3.19.4/include -I/usr/local/Cellar/lz4/1.9.3/include -I/usr/local/Cellar/zstd/1.5.2/include -I/usr/local/Cellar/utf8proc/2.7.0/include -I/usr/local/Cellar/libffi/3.4.2/include -I/usr/local/Cellar/glib/2.72.1/include -I/usr/local/Cellar/glib/2.72.1/include/glib-2.0 -I/usr/local/Cellar/glib/2.72.1/lib/glib-2.0/include -I/usr/local/opt/gettext/include -I/usr/local/Cellar/pcre/8.45/include -fcolor-diagnostics -Wall -Winvalid-pch -Wnon-virtual-dtor -Werror -std=c++11 -O0 -g -Wmissing-declarations -DARROW_NO_DEPRECATED_API -DUTF8PROC_EXPORTS -MD -MQ arrow-glib/libarrow-glib.800.dylib.p/expression.cpp.o -MF arrow-glib/libarrow-glib.800.dylib.p/expression.cpp.o.d -o arrow-glib/libarrow-glib.800.dylib.p/expression.cpp.o -c ../../c_glib/arrow-glib/expression.cpp
In file included from ../../c_glib/arrow-glib/expression.cpp:20:
In file included from ../../c_glib/arrow-glib/compute.hpp:22:
In file included from /usr/local/include/arrow/compute/api.h:48:
/usr/local/include/arrow/compute/exec/options.h:202:20: error: 'arrow::compute::BackpressureControl' has virtual functions but non-virtual destructor [-Werror,-Wnon-virtual-dtor]
class ARROW_EXPORT BackpressureControl {
                   ^
1 error generated.

Could you confirm this? (Is virtual ~BackpressureControl() = default; missing?)

@lidavidm
Copy link
Member

Sorry, we should've looked at CI more carefully. Thanks for catching this @kou and please see #12968

@westonpace
Copy link
Member Author

Sorry Kou. Thanks David!

@ursabot
Copy link

ursabot commented Apr 27, 2022

Benchmark runs are scheduled for baseline = 6c1a160 and contender = 78fb2ed. 78fb2ed is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed] test-mac-arm
[Finished ⬇️3.57% ⬆️3.21%] ursa-i9-9960x
[Finished ⬇️0.5% ⬆️0.0%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/603| 78fb2edd ec2-t3-xlarge-us-east-2>
[Failed] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/591| 78fb2edd test-mac-arm>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/590| 78fb2edd ursa-i9-9960x>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/601| 78fb2edd ursa-thinkcentre-m75q>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/602| 6c1a160c ec2-t3-xlarge-us-east-2>
[Failed] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/590| 6c1a160c test-mac-arm>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/589| 6c1a160c ursa-i9-9960x>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/600| 6c1a160c ursa-thinkcentre-m75q>
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@ursabot
Copy link

ursabot commented Apr 27, 2022

['Python', 'R'] benchmarks have high level of regressions.
ursa-i9-9960x

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants