Skip to content

Conversation

@bkietz
Copy link
Member

@bkietz bkietz commented Feb 19, 2021

  • ScanOptions is more explicit about scan time schemas: dataset (aka reader) and projected schemas are independent
  • RecordBatchProjector is replaced with a projection expression
  • Field metadata and nullability is preserved by projection exprs if the expression is a plain field reference; it's clear that field_ref("alpha") should inherit the attributes of "alpha" in the original dataset whereas equal(field_ref("alpha"), field_ref("beta")) doesn't necessarily inherit attributes from either field

No significant changes have been made to the Python/R bindings as the C++ API remains backward compatible with subset-of-columns projection. Adding these featutes will be handled in follow up.

Deferred to https://issues.apache.org/jira/browse/ARROW-11749 :

  • Ensure that stacked projection exprs are simplifiable to a single project expr
  • Refactor UnionDataset to support reconciling its children with projections

@github-actions
Copy link

@bkietz bkietz requested a review from fsaintjacques February 22, 2021 21:47
@bkietz bkietz force-pushed the 11174-Make-Expressions-availabl branch from d900bb0 to 33c4f3c Compare February 22, 2021 22:35
@bkietz bkietz marked this pull request as ready for review February 23, 2021 20:13
@bkietz bkietz changed the title ARROW-11174: [C++][Dataset] Make expressions available to projection [WIP] ARROW-11174: [C++][Dataset] Make expressions available to projection Feb 23, 2021
@bkietz bkietz requested a review from pitrou February 23, 2021 20:18
@bkietz bkietz force-pushed the 11174-Make-Expressions-availabl branch from d8b02ac to 1b95d70 Compare February 23, 2021 20:20
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now that the projection schema is separate from the reader schema, if one wants the output to be dictionary encoded they need to specify so in the projection schema, regardless of the underlying source encoding?

Copy link
Member Author

@bkietz bkietz Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dictionary encoding columns in scan is not currently supported; an output column can only be dictionary encoded if it was dictionary encoded in the fragment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's tested elsewhere but what happens if none of the projected fields have any corresponding value? Will it return a single row of all null? Or zero rows?

Copy link
Member Author

@bkietz bkietz Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test illustrates that case: f64 is not present in the input batch, and the result of such projection is a StructScalar whose fields are null

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a dataset with tables...

A,B
1,"x"
2,"x"

A
3
4

Then the query is SELECT B,'1' as C FROM dataset

This would output...

B,C
"X",1
"X",1
NULL,1

...right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see your concern. Individual calls to project do not broadcast scalars in case subsequent steps in the pipeline want to do something more efficient. FilterAndProjectScanTask broadcasts scalars to the correct length before yielding the batch: https://github.com/apache/arrow/pull/9532/files?file-filters%5B%5D=.cc&file-filters%5B%5D=.h&file-filters%5B%5D=.java&file-filters%5B%5D=.pxd&file-filters%5B%5D=.py#diff-25b1bd283e8242f8384b24a0f1e8b61fbca0c2784ab679f9a2a00b03450487aaR72-R76

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does NonMaterialized mean in this context? Maybe ProjectedFieldDifferingTypeFromInferred?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, it means the str column from the CSV file is part of the physical schema, but not the dataset schema (indeed the CSV scanner should not "materialize" it as a Arrow column).

@bkietz Should you add a check for the physical schema here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have read "NonProjected" rather than "NonMaterialized", sorry for the confusion and I'll amend. f64 will be materialized (since we need to filter on it) but is not projected (so it won't appear in scanned batches).

The physical schema of the batch will have the inferred type for each column, I'll add an assertion of this.

The key point of this test case is that we request f64 be read as a string, rather than a double. This is uncommon and only possible due to the textual nature of CSV, but it is not illegal. If the csv reader fails to respect the stated dataset schema this can break filter expressions in surprising ways.

For example it might erroneously consider only the projected schema and revert to inferring the type of a non projected field which would result in a comparison between a double and a string column.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old comment made more sense to me. Doesn't the projection still populate missing fields with null?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to ProjectedFields?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For project({add(field_ref("a"), field_ref("b"))}, {"a_plus_b"}) (SQL: SELECT a + b as a_plus_b) the materialized fields would be {a,b} while the projected fields would be {a_plus_b}. I'll add a long-form docstring clarifying this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is the projected_schema in that case schema({"a_plus_b"}) or schema({"a", "b"})?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert materialized_schema.names == ['a', 'b']
assert projected_schema.names == ['a_plus_b']

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, now that you are no longer using projected_schema I think my comment no longer holds. I assume FieldsInExpression(projection) is only going to yield materialized fields? You couldn't have something like (not sure if this is valid SQL or not)...

SELECT a+b as c, c+d as f

Copy link
Member Author

@bkietz bkietz Feb 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, FieldsInExpression(projection) will yield only materialized fields. That SQL's not supported; only fields in the dataset schema may be referenced

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the filter run against the projected_schema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No: both the filter and the projection will run against the dataset schema

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nowhere outside of tests do you specify nullability/metadata. If a customer wanted a field to have a custom metadata couldn't they just add it on themselves after the fact? I suppose you could extend my argument to the field names as well but even SQL allows as XYZ.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I only specify nullability and metadata for passthru projected fields https://github.com/bkietz/arrow/blob/1b95d70e4096704c702d1464e93bd218c30498d4/cpp/src/arrow/dataset/scanner_internal.h#L169-L173

For other fields (for example the field resulting from an arithmetic operation) users will need to attach any desired metadata after the scan. I require specifying field names for all projections since the default field name for a column like SELECT (total - amt) / total isn't clear to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to store a single vector, e.g.:

struct ProjectField {
  std::string name;
  bool nullable;
  std::shared_ptr<const KeyValueMetadata> metadata;

  ProjectField(std::string name, bool nullable=false,
      std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR)
    : name(std::move(name)), nullable(nullable), metadata(std::move(metadata)) {}
};

std::vector<ProjectField> fields_;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine with me; I was just following a perceived convention https://github.com/bkietz/arrow/blob/1b95d70e4096704c702d1464e93bd218c30498d4/cpp/src/arrow/array/array_nested.h#L331

I could also just use a Field with a null type

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, it means the str column from the CSV file is part of the physical schema, but not the dataset schema (indeed the CSV scanner should not "materialize" it as a Arrow column).

@bkietz Should you add a check for the physical schema here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the physical schema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not. The physical schemas of fragments are allowed to diverge from the dataset schema (missing columns, eventually an integer column might promote from int32 in older fragments to int64 in newer fragments, etc). The dataset schema is a schema to which those physical schemas can be reconciled

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"physical schema"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for making everything inline in this file (rather than define the functions in a .cc file)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I'm using this function and the others below in ScannerBuilder and in several tests to directly set properties of ScanOptions. The duplication is ultimately not necessary and I'll probably either remove ScannerBuilder or reuse it to produce ScanOptions in tests, at which point more of this functionality can migrate to .cc files

@bkietz bkietz force-pushed the 11174-Make-Expressions-availabl branch from 1b95d70 to 9e2ec5f Compare February 24, 2021 18:06
@nealrichardson
Copy link
Member

I've started testing in R with a branch (added commit here) on top of this and my mutate() branch, and I've got a simple test working, but I'm running into a problem trying to apply this to our canonical taxi dataset query from the docs. If I add a column that is derived from other columns, it only yields valid data if I also include those columns in my result. So this works:

>         ds %>%
+           filter(total_amount > 100, year == 2015) %>%
+           mutate(
+             tip_pct = 100 * tip_amount / total_amount
+           ) %>%
+           select(tip_pct, tip_amount, total_amount, passenger_count) %>%
+           group_by(passenger_count) %>%
+           collect() %>% head()
# A tibble: 6 x 4
# Groups:   passenger_count [2]
  tip_pct tip_amount total_amount passenger_count
    <dbl>      <dbl>        <dbl>           <int>
1    0           0           149.               1
2   16.6        22           132.               1
3    0           0           205.               1
4    7.27       10           138.               1
5   23.0        24.5         107.               1
6   11.2        12           107.               2

But what I want to do is this, so that I don't have to keep around the columns that I've already used to create my derived one:

>         ds %>%
+           filter(total_amount > 100, year == 2015) %>%
+           mutate(
+             tip_pct = 100 * tip_amount / total_amount
+           ) %>%
+           select(tip_pct, passenger_count) %>%
+           group_by(passenger_count) %>%
+           collect() %>% head()
# A tibble: 6 x 2
# Groups:   passenger_count [2]
  tip_pct passenger_count
    <dbl>           <int>
1      NA               1
2      NA               1
3      NA               1
4      NA               1
5      NA               1
6      NA               2

To be clear, this latter example sends to ScannerBuilder::Project a vector of 2 Expressions, one for tip_pct that is derived from other fields in the data (but not included in the projection) and one for passenger_count that is just a FieldRef.

@bkietz
Copy link
Member Author

bkietz commented Feb 25, 2021

@nealrichardson thanks for testing! I'll add an analogous c++ test and get it passing

@bkietz bkietz force-pushed the 11174-Make-Expressions-availabl branch from 7328bdf to 3db5fde Compare February 25, 2021 14:33
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will create a new pull-request to remove the bindings of these internal classes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bkietz bkietz force-pushed the 11174-Make-Expressions-availabl branch from dece474 to ac9f074 Compare February 26, 2021 17:12
Copy link
Member

@nealrichardson nealrichardson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, we can address any followups in ARROW-11704

@bkietz bkietz deleted the 11174-Make-Expressions-availabl branch March 15, 2021 15:39
pribor pushed a commit to GlobalWebIndex/arrow that referenced this pull request Oct 24, 2025
- ScanOptions is more explicit about scan time schemas: dataset (aka reader) and projected schemas are independent
- RecordBatchProjector is replaced with a projection expression
- Field metadata and nullability is preserved by projection exprs *if* the expression is a plain field reference; it's clear that `field_ref("alpha")` should inherit the attributes of "alpha" in the original dataset whereas `equal(field_ref("alpha"), field_ref("beta"))` doesn't necessarily inherit attributes from either field

No significant changes have been made to the Python/R bindings as the C++ API remains backward compatible with subset-of-columns projection. Adding these featutes will be handled in follow up.
- R: https://issues.apache.org/jira/browse/ARROW-11704
- Python: https://issues.apache.org/jira/browse/ARROW-11750

Deferred to https://issues.apache.org/jira/browse/ARROW-11749 :
- Ensure that stacked projection exprs are simplifiable to a single project expr
- Refactor UnionDataset to support reconciling its children with projections

Closes apache#9532 from bkietz/11174-Make-Expressions-availabl

Authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Signed-off-by: Neal Richardson <neal.p.richardson@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants