-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-13238: [C++][Compute][Dataset] Use an ExecPlan for dataset scans #10664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cpp/src/arrow/dataset/scanner.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a little unfortunate that we're making several heap allocations for every batch to carry effectively three integers (though I guess the overhead is probably not noticeable in the grand scheme of things and batches should be relatively large).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess if it is ever noticeable overhead, we could make the booleans singletons and pool the integer allocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
I'll test this on EC2 when I get a chance and compare to 4.0/current master. |
|
@ursabot please benchmark lang=Python |
|
Benchmark runs are scheduled for baseline = 780e95c and contender = c1808f9156bd4df03cf8338217ffd93f519abb96. Results will be available as each benchmark for each run completes. |
|
(Looks like we need to rebase before the Conbench benchmarks can run.) |
9e1fbc8 to
c45af38
Compare
|
@ursabot please benchmark lang=Python |
|
Benchmark runs are scheduled for baseline = 780e95c and contender = dfb10c0. Results will be available as each benchmark for each run completes. |
|
From Conbench seems like performance isn't really affected. I'll manually run those benchmarks with more iterations as well. |
| auto batch_gen_gen, | ||
| FragmentsToBatches(std::move(fragment_gen), options, filter_and_project)); | ||
| auto batch_gen_gen_readahead = | ||
| MakeSerialReadaheadGenerator(std::move(batch_gen_gen), options->fragment_readahead); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ben and I spoke about this. The serial readahead wasn't actually adding anything (batch_gen_gen is really just listing fragments) and it was invalid because a serial readahead generator is not async reentrant and should not be consumed by MakeMergedGenerator.
|
This got stuck while I was testing it (I'll try to get a backtrace shortly). But performance indeed seems to be on par with before. Details4.0.1: 5.0.0 master Count rows, no filter, 10 iterations: This PR Count rows, no filter, 10 iterations: (It got stuck when a filter was applied) |
|
Ok, actually, I'm not sure if it was stuck or just me being inpatient… |
|
Now that I'm running it again it just seems that count_rows with a filter got very slow. Test scriptimport statistics
import time
import pyarrow
import pyarrow.dataset
import pyarrow.fs
fs = pyarrow.fs.S3FileSystem(region="us-east-2")
print("PyArrow:", pyarrow.__version__)
times = []
for _ in range(10):
start = time.monotonic()
ds = pyarrow.dataset.dataset([
"ursa-labs-taxi-data/2009/01/data.parquet",
"ursa-labs-taxi-data/2009/02/data.parquet",
"ursa-labs-taxi-data/2009/03/data.parquet",
"ursa-labs-taxi-data/2009/04/data.parquet",
],
format="parquet",
partitioning=["year", "month"],
filesystem=fs,
)
print("Rows:", ds.scanner(use_async=True, use_threads=True).count_rows())
times.append(time.monotonic() - start)
print("No filter")
print("Min:", min(times), "s")
print("Median:", statistics.median(times), "s")
print("Mean:", statistics.mean(times), "s")
print("Max:", max(times), "s")
print("With filter")
times = []
expr = pyarrow.dataset.field("passenger_count") > 1
for _ in range(10):
start = time.monotonic()
ds = pyarrow.dataset.dataset([
"ursa-labs-taxi-data/2009/01/data.parquet",
"ursa-labs-taxi-data/2009/02/data.parquet",
"ursa-labs-taxi-data/2009/03/data.parquet",
"ursa-labs-taxi-data/2009/04/data.parquet",
],
format="parquet",
partitioning=["year", "month"],
filesystem=fs,
)
print("Rows:", ds.scanner(use_async=True, use_threads=True, filter=expr).count_rows())
times.append(time.monotonic() - start)
print("Min:", min(times), "s")
print("Median:", statistics.median(times), "s")
print("Mean:", statistics.mean(times), "s")
print("Max:", max(times), "s") |
|
Well that's disappointing. I was actually expecting it to get faster |
|
If we correct the CountRows issue above, CountRows indeed gets much faster! |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This review is overdue, sorry. I think my main question is whether we want to use a stop token instead of a stop producing method.
| std::unique_lock<std::mutex> lock(mutex_); | ||
| if (!finished_) { | ||
| finished_ = true; | ||
| finished_ = Loop([this, options] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than use Loop directly it seems you could use MakeTransferred and VisitAsyncGenerator. You might need to add ErrorVisitor and stop_callback support to VisitAsyncGenerator but then it would be more univerally supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason not to use Loop, though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason other than readability. The visitor would just be using Loop under the hood.
| bool finished_{false}; | ||
| int next_batch_index_{0}; | ||
| Future<> finished_fut_ = Future<>::MakeFinished(); | ||
| bool stop_requested_{false}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of exposing a StopProducing you might need to take in a stop token. If you don't then how will you handle the following...
A pyarrow user runs a to_table on some dataset. First it has to do inspection, then it has to actually do the scan. At any point the user might press Ctrl-C to cancel the thing.
We can support it with stop token by setting the Ctrl-C stop token handler and then passing that stop token down to the inspection call and the scan call.
As a minor benefit you can get rid of all the locks here because they are inside the stop token itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an issue (ARROW-12938) for letting a StopToken run a callback since it's needed to support other things like gRPC (which also exposes a cancel method instead taking a cancellable context). But that'd be more work and requires dealing with signal safety in the callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replacing with a stop token sounds useful but I'd prefer to handle that in a follow up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A follow up sounds fine to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed ARROW-13297.
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I left a couple tiny things but feel free to merge.
|
|
||
| // Returns a matcher that matches the value of a successful Result<T> or Future<T>. | ||
| // (Future<T> will be waited upon to acquire its result for matching.) | ||
| // Returns a matcher that waits on a Future (by default for 16 seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: kDefaultAssertFinishesWaitSeconds seconds?
|
|
||
| ~ExecPlanImpl() override { | ||
| if (started_ && !stopped_) { | ||
| if (started_ && !finished_.is_finished()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is slightly race-prone/this is a TOCTOU, right? Though the consequences aren't big here.
| Status StartProducing() override { | ||
| if (finished_) { | ||
| return Status::Invalid("Restarted SourceNode '", label(), "'"); | ||
| DCHECK(!stop_requested_) << "Restarted SourceNode"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not still return a Status? It might save us from odd bug reports if we error instead of letting internal state get trampled in case of a bug.
| bool finished_{false}; | ||
| int next_batch_index_{0}; | ||
| Future<> finished_fut_ = Future<>::MakeFinished(); | ||
| bool stop_requested_{false}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed ARROW-13297.
Replaces the body of AsyncScanner::ScanBatchesAsync with usage of an ExecPlan