Skip to content

Conversation

@bkietz
Copy link
Member

@bkietz bkietz commented Jul 6, 2021

Replaces the body of AsyncScanner::ScanBatchesAsync with usage of an ExecPlan

@bkietz bkietz requested review from lidavidm and pitrou and removed request for lidavidm July 6, 2021 15:28
@github-actions
Copy link

github-actions bot commented Jul 6, 2021

@bkietz bkietz requested a review from lidavidm July 6, 2021 15:30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little unfortunate that we're making several heap allocations for every batch to carry effectively three integers (though I guess the overhead is probably not noticeable in the grand scheme of things and batches should be relatively large).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess if it is ever noticeable overhead, we could make the booleans singletons and pool the integer allocations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lidavidm
Copy link
Member

lidavidm commented Jul 6, 2021

I'll test this on EC2 when I get a chance and compare to 4.0/current master.

@lidavidm
Copy link
Member

lidavidm commented Jul 6, 2021

@ursabot please benchmark lang=Python

@ursabot
Copy link

ursabot commented Jul 6, 2021

Benchmark runs are scheduled for baseline = 780e95c and contender = c1808f9156bd4df03cf8338217ffd93f519abb96. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Failed] ec2-t3-xlarge-us-east-2 (mimalloc)
[Failed] ursa-i9-9960x (mimalloc)
[Skipped ⚠️ Only ['C++', 'Java'] langs are supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q (mimalloc)
Supported benchmarks:
ursa-i9-9960x: langs = Python, R
ursa-thinkcentre-m75q: langs = C++, Java
ec2-t3-xlarge-us-east-2: cloud = True

@lidavidm
Copy link
Member

lidavidm commented Jul 6, 2021

(Looks like we need to rebase before the Conbench benchmarks can run.)

@bkietz bkietz force-pushed the 13238-Substitute-ExecPlan-impl- branch from 9e1fbc8 to c45af38 Compare July 6, 2021 17:35
@lidavidm
Copy link
Member

lidavidm commented Jul 6, 2021

@ursabot please benchmark lang=Python

@ursabot
Copy link

ursabot commented Jul 6, 2021

Benchmark runs are scheduled for baseline = 780e95c and contender = dfb10c0. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2 (mimalloc)
[Finished ⬇️0.0% ⬆️0.0%] ursa-i9-9960x (mimalloc)
[Skipped ⚠️ Only ['C++', 'Java'] langs are supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q (mimalloc)
Supported benchmarks:
ursa-i9-9960x: langs = Python, R
ursa-thinkcentre-m75q: langs = C++, Java
ec2-t3-xlarge-us-east-2: cloud = True

@lidavidm
Copy link
Member

lidavidm commented Jul 6, 2021

From Conbench seems like performance isn't really affected. I'll manually run those benchmarks with more iterations as well.

auto batch_gen_gen,
FragmentsToBatches(std::move(fragment_gen), options, filter_and_project));
auto batch_gen_gen_readahead =
MakeSerialReadaheadGenerator(std::move(batch_gen_gen), options->fragment_readahead);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ben and I spoke about this. The serial readahead wasn't actually adding anything (batch_gen_gen is really just listing fragments) and it was invalid because a serial readahead generator is not async reentrant and should not be consumed by MakeMergedGenerator.

@lidavidm
Copy link
Member

lidavidm commented Jul 6, 2021

This got stuck while I was testing it (I'll try to get a backtrace shortly). But performance indeed seems to be on par with before.

Details

4.0.1:

        "iterations": 10,
        "max": "10.501172",
        "mean": "9.783475",
        "median": "9.783779",
        "min": "9.203580",
        "q1": "9.451842",
        "q3": "10.074056",

5.0.0 master

        "iterations": 10,
        "max": "12.266335",
        "mean": "10.373638",
        "median": "10.223064",
        "min": "9.785621",
        "q1": "10.084575",
        "q3": "10.322508",

Count rows, no filter, 10 iterations:

Min: 0.39461345200015785 s
Median: 0.420534689500073 s
Mean: 0.4404416943999422 s
Max: 0.5733599779996439 s

This PR

        "iterations": 10,
        "max": "11.053488",
        "mean": "10.406635",
        "median": "10.273359",
        "min": "9.980684",
        "q1": "10.145428",
        "q3": "10.701346",

Count rows, no filter, 10 iterations:

Min: 0.3555871419994219 s
Median: 0.38891541300017707 s
Mean: 0.4031996126002014 s
Max: 0.5227700120003647 s

(It got stuck when a filter was applied)

@lidavidm
Copy link
Member

lidavidm commented Jul 6, 2021

Ok, actually, I'm not sure if it was stuck or just me being inpatient…

@lidavidm
Copy link
Member

lidavidm commented Jul 6, 2021

Now that I'm running it again it just seems that count_rows with a filter got very slow.

Before:
Min: 24.285114823000185 s
Median: 25.309564675000274 s
Mean: 25.45627587750014 s
Max: 26.564001424999333 s

After:
Min: 113.42360810700029 s
Median: 117.11463808250028 s
Mean: 118.39458996600005 s
Max: 123.9104565839998 s
Test script
import statistics
import time
import pyarrow 
import pyarrow.dataset
import pyarrow.fs

fs = pyarrow.fs.S3FileSystem(region="us-east-2")

print("PyArrow:", pyarrow.__version__)

times = []
for _ in range(10):
    start = time.monotonic()
    ds = pyarrow.dataset.dataset([
        "ursa-labs-taxi-data/2009/01/data.parquet",
        "ursa-labs-taxi-data/2009/02/data.parquet",
        "ursa-labs-taxi-data/2009/03/data.parquet",
        "ursa-labs-taxi-data/2009/04/data.parquet",
        ],
        format="parquet",
        partitioning=["year", "month"],
        filesystem=fs,
    )
    print("Rows:", ds.scanner(use_async=True, use_threads=True).count_rows())
    times.append(time.monotonic() - start)

print("No filter")
print("Min:", min(times), "s")
print("Median:", statistics.median(times), "s")
print("Mean:", statistics.mean(times), "s")
print("Max:", max(times), "s")

print("With filter")
times = []
expr = pyarrow.dataset.field("passenger_count") > 1
for _ in range(10):
    start = time.monotonic()
    ds = pyarrow.dataset.dataset([
        "ursa-labs-taxi-data/2009/01/data.parquet",
        "ursa-labs-taxi-data/2009/02/data.parquet",
        "ursa-labs-taxi-data/2009/03/data.parquet",
        "ursa-labs-taxi-data/2009/04/data.parquet",
        ],
        format="parquet",
        partitioning=["year", "month"],
        filesystem=fs,
    )
    print("Rows:", ds.scanner(use_async=True, use_threads=True, filter=expr).count_rows())
    times.append(time.monotonic() - start)
print("Min:", min(times), "s")
print("Median:", statistics.median(times), "s")
print("Mean:", statistics.mean(times), "s")
print("Max:", max(times), "s")

@bkietz
Copy link
Member Author

bkietz commented Jul 7, 2021

Well that's disappointing. I was actually expecting it to get faster

@lidavidm
Copy link
Member

lidavidm commented Jul 7, 2021

If we correct the CountRows issue above, CountRows indeed gets much faster!

Before:

Min: 24.285114823000185 s
Median: 25.309564675000274 s
Mean: 25.45627587750014 s
Max: 26.564001424999333 s

After:

Min: 6.617772240000249 s
Median: 6.906719556000098 s
Mean: 6.926710154500052 s
Max: 7.311853507000251 s

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This review is overdue, sorry. I think my main question is whether we want to use a stop token instead of a stop producing method.

std::unique_lock<std::mutex> lock(mutex_);
if (!finished_) {
finished_ = true;
finished_ = Loop([this, options] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than use Loop directly it seems you could use MakeTransferred and VisitAsyncGenerator. You might need to add ErrorVisitor and stop_callback support to VisitAsyncGenerator but then it would be more univerally supported.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to use Loop, though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason other than readability. The visitor would just be using Loop under the hood.

bool finished_{false};
int next_batch_index_{0};
Future<> finished_fut_ = Future<>::MakeFinished();
bool stop_requested_{false};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of exposing a StopProducing you might need to take in a stop token. If you don't then how will you handle the following...

A pyarrow user runs a to_table on some dataset. First it has to do inspection, then it has to actually do the scan. At any point the user might press Ctrl-C to cancel the thing.

We can support it with stop token by setting the Ctrl-C stop token handler and then passing that stop token down to the inspection call and the scan call.

As a minor benefit you can get rid of all the locks here because they are inside the stop token itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an issue (ARROW-12938) for letting a StopToken run a callback since it's needed to support other things like gRPC (which also exposes a cancel method instead taking a cancellable context). But that'd be more work and requires dealing with signal safety in the callback.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing with a stop token sounds useful but I'd prefer to handle that in a follow up

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A follow up sounds fine to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed ARROW-13297.

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I left a couple tiny things but feel free to merge.


// Returns a matcher that matches the value of a successful Result<T> or Future<T>.
// (Future<T> will be waited upon to acquire its result for matching.)
// Returns a matcher that waits on a Future (by default for 16 seconds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: kDefaultAssertFinishesWaitSeconds seconds?


~ExecPlanImpl() override {
if (started_ && !stopped_) {
if (started_ && !finished_.is_finished()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slightly race-prone/this is a TOCTOU, right? Though the consequences aren't big here.

Status StartProducing() override {
if (finished_) {
return Status::Invalid("Restarted SourceNode '", label(), "'");
DCHECK(!stop_requested_) << "Restarted SourceNode";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not still return a Status? It might save us from odd bug reports if we error instead of letting internal state get trampled in case of a bug.

bool finished_{false};
int next_batch_index_{0};
Future<> finished_fut_ = Future<>::MakeFinished();
bool stop_requested_{false};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed ARROW-13297.

@bkietz bkietz closed this in b172284 Jul 9, 2021
@bkietz bkietz deleted the 13238-Substitute-ExecPlan-impl- branch July 9, 2021 19:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants