Skip to content

Conversation

@nealrichardson
Copy link
Member

@nealrichardson nealrichardson commented Jan 18, 2023

Rationale for this change

Followup to https://github.com/apache/arrow/pull/19706/files#r1073391100 with the goal of deleting and simplifying some code. As it turned out, it was more about moving code from the R bindings to the C++ library.

Are there any user-facing changes?

Not for R users, but this fixes a bug in the dataset C++ library where nested field refs could not be handled by the scanner.

@github-actions
Copy link

@github-actions
Copy link

⚠️ GitHub issue #33760 has been automatically assigned in GitHub to PR creator.

@nealrichardson nealrichardson changed the title GH-33760: [R] Push projection expressions into ScanNode GH-33760: [R][C++] Handle nested field refs in scanner Jan 19, 2023
@nealrichardson nealrichardson marked this pull request as ready for review January 19, 2023 17:30
const std::shared_ptr<Schema>& dataset_schema) {
// process resultant dataset_schema after projection
FieldVector project_fields;
std::set<std::string> field_names;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a set here because my R test failed because it was generating duplicated fields in the schema--the projection expression included the nested field in two different places. Maybe ->arguments does deduplication so this wasn't a problem with non-nested refs. But IDK if this is the right choice, if someone cares about order that gets lost, or if there's a better way. What do you think @westonpace ? (I didn't run the C++ tests yet so maybe there are order-dependent tests that fail.)

Also, this function seems like a natural place to use FieldsInExpression (from expression.cc)--is there a reason it wasn't used here? It wouldn't solve the duplication issue because you could still have two nested field refs pointing to different fields within the same top-level struct, but it would let you assume that everything you're iterating over is a FieldRef.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if someone cares about order that gets lost, or if there's a better way.

We very much care about order here :). Fortunately, std::set is an ordered set so the order should not be lost.

Copy link
Member

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't speak to the C++ change; however, the R and R/C++ changes look good to me. Thank you!

)

# Now with Dataset: make sure column pushdown in ScanNode works
skip_if_not_available("dataset")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #33778 already!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks, I'll rebase and remove this.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want some unit tests?

This seems like it will load the entire column into memory (since you're using the top-level name). For formats that support nested load (e.g. parquet) I thought we had a better implementation that already worked. For example:

Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader& reader,
seems to be expecting nested refs in the projection. But I could be misunderstanding. The expression-based projection of the old scanner always confused me a little.

const std::shared_ptr<Schema>& dataset_schema) {
// process resultant dataset_schema after projection
FieldVector project_fields;
std::set<std::string> field_names;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if someone cares about order that gets lost, or if there's a better way.

We very much care about order here :). Fortunately, std::set is an ordered set so the order should not be lost.

@nealrichardson
Copy link
Member Author

Do you want some unit tests?

Of course, this needs some. The tests that were added for this function when it was introduced (8972ebd#diff-273815dd1d6770a7ef790980d9039adddf0ef8efa88c0745234906ab16ac09dfR2784-R2838) are more e2e than unit tests of the function, but I could try to tack on something there with a struct column and a nested ref. Happy to take a different approach if you have suggestions though. I haven't run C++ unit tests in forever, so figured I'd get some feedback before diving in there.

This seems like it will load the entire column into memory (since you're using the top-level name).

True, and this is how it currently works from R anyway. The effect of this change is to not fail when other languages/libraries try to scan with nested refs.

For formats that support nested load (e.g. parquet) I thought we had a better implementation that already worked. For example:

Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader& reader,

seems to be expecting nested refs in the projection. But I could be misunderstanding. The expression-based projection of the old scanner always confused me a little.

@jorisvandenbossche mentioned this in my previous PR, and that's why I wanted to send nested refs instead of top-level columns. So why aren't I hitting that code? I'm creating a ScanNode for an ExecPlan. Or am I only hitting this code because I'm testing with an InMemoryDataset? Clearly I'm not hitting file_parquet.cc with that, but does a parquet FileSystemDataset avoid this scanner code entirely? I'm skeptical about that (though it should be easily verified) since this function was added to fix an issue with unbound schemas that IIRC people were experiencing with actual parquet datasets.

@westonpace
Copy link
Member

I haven't run C++ unit tests in forever, so figured I'd get some feedback before diving in there.

Sorry, I was thinking of R e2e tests. I would hope the C++ change is covered by existing tests. Although I think we've found in the past that it is easy to accidentally load too much from the disk and still pass the tests.

@jorisvandenbossche mentioned this in my previous PR, and that's why I wanted to send nested refs instead of top-level columns. So why aren't I hitting that code?

I don't know sadly. I will try and investigate later today. I could tell you how it works in the new scan node 😆 but I don't think that will be too useful to you yet.

@nealrichardson
Copy link
Member Author

I haven't run C++ unit tests in forever, so figured I'd get some feedback before diving in there.

Sorry, I was thinking of R e2e tests. I would hope the C++ change is covered by existing tests. Although I think we've found in the past that it is easy to accidentally load too much from the disk and still pass the tests.

R tests cover this already, from #19706. I thought this was just going to be deleting code from the R package: instead of finding the top-level field names in the projection and sending them in the ScanNode, I'd send the projection and the scanner would pull out the fields. Turns out though that the scanner couldn't handle nested field refs, so I took the R binding changes from #19706 and adapted them to go in scanner.cc.

But the R test uses InMemoryDataset so doesn't touch parquet, and of course it can't make any assertions about whether the right amount of data is loaded.

@westonpace
Copy link
Member

I thought this was just going to be deleting code from the R package: instead of finding the top-level field names in the projection and sending them in the ScanNode, I'd send the projection and the scanner would pull out the fields.

Got it, so this is not a behavior change, but moving the logic from R into C++ where it would appropriately belong?

@nealrichardson
Copy link
Member Author

I thought this was just going to be deleting code from the R package: instead of finding the top-level field names in the projection and sending them in the ScanNode, I'd send the projection and the scanner would pull out the fields.

Got it, so this is not a behavior change, but moving the logic from R into C++ where it would appropriately belong?

Right; nested field refs are currently not handled by the C++ scanner.

@westonpace
Copy link
Member

@nealrichardson Ok, I did some investigation.

First, the reason this is not being encountered from pyarrow:

The scanner options currently takes both a projected schema and a projection expression. R sets the projection expression (and so the C++ needs to figure out the projected schema) and python sets the projected schema (and C++ needs to figure out the projection expression). So pyarrow never encounters the code you are modifying (to the best of my knowledge).

Second, the concern about loading the entire top-level field:

It turns out that partial column loading was never fully implemented anyways. So even though we go through all the trouble of figuring out exactly which child to load, we still just load the entire top-level field.

That being said, if R is working as you expect, then I approve this approach.

@nealrichardson
Copy link
Member Author

@nealrichardson Ok, I did some investigation.

First, the reason this is not being encountered from pyarrow:

The scanner options currently takes both a projected schema and a projection expression. R sets the projection expression (and so the C++ needs to figure out the projected schema) and python sets the projected schema (and C++ needs to figure out the projection expression). So pyarrow never encounters the code you are modifying (to the best of my knowledge).

What's odd is that the the projection provided to the ScanNode isn't what the ScanNode returns. The function I changed here returns a schema, but it is not the schema that would result from the projection--it's the schema of the fields referenced in the projection expression. (The scanner also adds the augmented fields, so the schema of the data that comes out of the ScanNode is also different from that.) So I don't know why you need the projection expression at all, unless it is aspirational/future-looking for some time when the projection can be pushed down and handled by the file readers or whatever.

Second, the concern about loading the entire top-level field:

It turns out that partial column loading was never fully implemented anyways. So even though we go through all the trouble of figuring out exactly which child to load, we still just load the entire top-level field.

That being said, if R is working as you expect, then I approve this approach.

We can ship this but I wonder if it wouldn't be better just to remove this projection interface and only accept a schema, which may filter out top-level fields only, no other projection.

@westonpace
Copy link
Member

So I don't know why you need the projection expression at all, unless it is aspirational/future-looking for some time when the projection can be pushed down and handled by the file readers or whatever.

We can ship this but I wonder if it wouldn't be better just to remove this projection interface and only accept a schema, which may filter out top-level fields only, no other projection.

It's less aspirational and more historical. This comes from the pre-ExecPlan days when the scanner was "scan->filter->project". The new scan node API simply accepts a vector of field refs. However, it should support those field refs being nested refs.

@nealrichardson
Copy link
Member Author

The new scan node API simply accepts a vector of field refs.

Where exactly? MaterializedFields() appears to be a method of ScanOptions, not something I can set myself: https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/scanner.h#L134

Or should I be using ScanV2Options? https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/scanner.h#L154

@westonpace
Copy link
Member

Or should I be using ScanV2Options?

Yes, but it's not ready yet. Sorry if that was misleading. By "new" I meant "the API we are working towards".

@nealrichardson
Copy link
Member Author

Alright then, so should we merge this as is? Or close it without merging and bother with it later whenever V2 is ready? I'm fine either way.

@westonpace
Copy link
Member

I think we should merge as-is since we don't want to let pending work block current capabilities.

@nealrichardson nealrichardson merged commit d0a7fb9 into apache:master Jan 24, 2023
@nealrichardson nealrichardson deleted the project-expr branch January 24, 2023 16:56
@ursabot
Copy link

ursabot commented Jan 24, 2023

Benchmark runs are scheduled for baseline = 22e3bed and contender = d0a7fb9. d0a7fb9 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️0.95% ⬆️0.03%] test-mac-arm
[Failed ⬇️0.93% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.09% ⬆️0.0%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] d0a7fb94 ec2-t3-xlarge-us-east-2
[Failed] d0a7fb94 test-mac-arm
[Failed] d0a7fb94 ursa-i9-9960x
[Finished] d0a7fb94 ursa-thinkcentre-m75q
[Finished] 22e3bed4 ec2-t3-xlarge-us-east-2
[Failed] 22e3bed4 test-mac-arm
[Failed] 22e3bed4 ursa-i9-9960x
[Finished] 22e3bed4 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@jorisvandenbossche
Copy link
Member

It turns out that partial column loading was never fully implemented anyways. So even though we go through all the trouble of figuring out exactly which child to load, we still just load the entire top-level field.

Yes, the open issue about this is #33167 (the Parquet reader itself supports this, and so because we switched pyarrow.parquet.read_table from the direct Parquet reader to dataset based reader, that's actually a perf regression for people that were using this)

westonpace pushed a commit that referenced this pull request Mar 21, 2023
…field (#34576)

### Rationale for this change

Fixes #34519. #33770 introduced the bug; I had [asked](https://github.com/apache/arrow/pull/33770/files#r1081612013) in the review why the C++ function wasn't using `FieldsInExpression`. I swapped that in, and the test I added to reproduce the bug now passes.

### What changes are included in this PR?

Fix for the C++ function, test in R. 

### Are these changes tested?

Yes

### Are there any user-facing changes?

The behavior observed in the report no longer happens.
* Closes: #34519

Authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[R] Push projection expressions into ScanNode

5 participants