Skip to content

GH-33976: [Python] Initial bindings for acero Declaration and ExecNodeOptions classes#34102

Merged
jorisvandenbossche merged 13 commits intoapache:mainfrom
jorisvandenbossche:gh-33976-declaration-options
Mar 3, 2023
Merged

GH-33976: [Python] Initial bindings for acero Declaration and ExecNodeOptions classes#34102
jorisvandenbossche merged 13 commits intoapache:mainfrom
jorisvandenbossche:gh-33976-declaration-options

Conversation

@jorisvandenbossche
Copy link
Copy Markdown
Member

@jorisvandenbossche jorisvandenbossche commented Feb 9, 2023

First step for GH-33976, adding basic bindings for the different ExecNodeOptions classes and the Declaration class to combine those in a query.

Some notes on what is and what is not included in this PR:

  • For source nodes, didn't expose the generic SourceNodeOptions et al, only the concrete TableSourceNodeOptions (should probably also add RecordBatchReaderSourceNodeOptions)
  • Didn't yet expose any sink nodes. The table sink is implicitly used by Declaration.to_table(), and given that there is currently no explicit API to manually convert to ExecPlan and execute it, explicit table sink node bindings didn't seem necessary.
  • Also didn't yet expose the order_by sink node, because this requires a custom sink when collecting as a Table, and it's not directly clear how this is possible with the Declaration interface. This requires [C++] Add an order_by node which can reassign an ordering mid-plan #34248 to be fixed first.
  • Leaving dataset-based scan and write nodes for a follow-up PR
  • Basic class for Declaration with a to_table method to execute the plan and consume it into a Table, and a to_reader() to get a RecordBatchReader (could also further add a to_batches() method)

--

@github-actions
Copy link
Copy Markdown

github-actions bot commented Feb 9, 2023

@github-actions
Copy link
Copy Markdown

github-actions bot commented Feb 9, 2023

⚠️ GitHub issue #33976 has been automatically assigned in GitHub to PR creator.

@jorisvandenbossche jorisvandenbossche marked this pull request as ready for review February 15, 2023 16:01
cdef:
shared_ptr[CTable] c_table

c_table = GetResultValue(DeclarationToTable(self.decl, use_threads))
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I use DeclarationToTable or DeclarationToTableAsync here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use DeclarationToTable. We have not historically exposed async methods to python and I don't see much benefit in doing so.

# try:
# c_join_type = self._join_type_map[join_type]
# except KeyError:
# raise ValueError("Unsupported join type")
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am getting compile errors with this more compact option, so wrote it out with if/elses below (but will still look into the errors)

@danepitkin
Copy link
Copy Markdown
Member

Awesome stuff! I'm not too familiar with Acero, but looks like a good starting foundation for pyarrow.

Copy link
Copy Markdown
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial thoughts


cdef const CExecNodeOptions* get_options(self) except NULL
cdef void init(self, const shared_ptr[CExecNodeOptions]& sp)
cdef inline shared_ptr[CExecNodeOptions] unwrap(self)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nogil? Or do you only add that when needed? I don't really know and am just looking at the difference between the two unwrap methods and hopefully learning something new :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's a good reminder that I still need to check where to add nogil in general. I suppose for the options it is less critical, but certainly for executing with eg DeclarationToTable I should certainly put that in a nogil block.

Comment on lines +40 to +41
cdef const CExecNodeOptions* get_options(self) except NULL:
return self.wrapped.get()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for? Why use this over unwrap?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the pattern we had for the compute options classes, so started with that. But in the end it's not actually being used here, so can remove it then.

# def __repr__(self):
# type_name = self.__class__.__name__
# # Remove {} so we can use our own braces
# string_repr = frombytes(self.get_options().ToString())[1:-1]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly, I don't think options are printable. Declarations can be printed with DeclarationToString. We could probably add ToString to the options objects if needed but I don't think that is typical (e.g. CSV options, etc.)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should have added some comments to the WIP things. This was a reminder to see if we want a repr. But I agree that for the option classes here, that's not very important. Typically you only create them just to directly pass to a Declaration constructor (inspecting options created by someone else seems not a typical use case here)

Comment on lines +55 to +56
# def __eq__(self, FunctionOptions other):
# return self.get_options().Equals(deref(other.get_options()))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Value equality of options objects (i.e. *self.unwrap() == *other.unwrap()) should generally be good enough I think.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace the ExecNodeOptions class in C++ does not define a operator== overload, so I don't think it allows such equality check? (or where you thinking about pure pointer equality?)

Anyway, that's also not very important here (if we would like to add support for pickling those objects (and declarations / execplans in general, that might become more important, but that is definitely out of scope for this PR).

Make a node which excludes some rows from batches passed through it.

The "filter" operation provides an option to define data filtering
criteria. It selects rows matching a given expression. Filters can
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"matching a given expression" is probably ok but it is a bit vague. Maybe "It selects rows where the given expression evaluates to true"? Also, we may want to clarify that the expression must have a return type of boolean.

The column name can be a string, an empty list or a list of
column names, for unary, nullary and n-ary aggregation functions
respectively.
keys : list, optional
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A list of what?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A list of "field references", where each field reference can either be a string name, integer reference, or fieldref expression.

Updated the documentation and added some more test coverage for this.

Using integers (i.e. a FieldPath) does work out of the box, but given that this position is "relative" to the input of the node (i.e. output of the previous node) and not for the full plan, that's probably not something we want to encourage.

Comment on lines +313 to +315
join_type : str
Type of join. One of "left semi", "right semi", "left anti",
"right anti", "inner", "left outer", "right outer", "full outer".
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a default? I wonder if it makes sense to default to inner?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion about a default. I think it's fine for a low level interface to require to specify it. On the C++ side there is one simplified constructor signature that defaults to inner, the others are explicit.


class HashJoinNodeOptions(_HashJoinNodeOptions):
"""
Make a node which implements join operation using hash join strategy.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit sparse. Maybe...

Make a node which joins two inputs together based on common keys. Each row from the left input will be matched with rows from the right input where the keys are equal. Null keys do not match other null keys. If any key is null then that row will not match. Which columns are output, and the behavior of rows that do not match, will depend on the join type.

inner: for each row in the left input there will be one output row per matching row in the right input. If there are no matching rows in the right input then there will be no output row for that input row. The output columns will be the left_keys, followed by the right_keys (these will be identical to the left_keys), followed by the left_output, followed by the right_output.

left outer: behaves the same as an inner join. However, if there are no matching rows in the right input then there will be one output row that contains null for each column in right_keys and right_output. A row in the right input, that has no matching row in the left input, will not be output. The output columns will be identical to an inner join.

right outer: for each row in the right input there will be one output row per matching row in the left input. If there are no matching rows in the left input then there will be one output row containing null in left_keys and left_output. A row in the left input, that has no matching row in the right input, will not be output. The output columns will be identical to an inner join.

full outer: behaves the same as a left outer join. However, if there is a row in the right input, that has no matching row in the left input, then there will be one output row containing null for each column in left_keys and left_output. The output columns will be identical to an inner join.

left semi: for each row in the left input there will be exactly one output row, if and only if, there is a matching row in the right input. The output columns will be the left_keys followed by the left_output.

right semi: for each row in the right input there will be exactly one output row, if and only if, there is a matching row in the left input. The output columns will be the right_keys followed by the right_output.

left anti: for each row in the left input there will be exactly one output row, if and only if, there is not a matching row in the right input. The output will be the same as left semi.

right anti: for each row in the right input there will be exactly one output row, if and only if, there is not a matching row in the left input. The output will be the same as right semi.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this suggestion is a larger addition, going to leave that for a separate PR.

Comment on lines +358 to +360
factory_name : str
The ExecNode factory name, such as "table_source", "filter",
"project" etc.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think users may be confused by this since it might not be clear to them why a "factory name" is a necessary concept at all.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the factory name is needed for the registry? (so that you can register your own custom nodes)

Do you suggest using a different name here? (eg just "name" instead of "factory name") Or to explain it better what this "factory name" is?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the factory name is currently needed for the registry. Either you need to explain what it is and explain, in each of the options, what factory name they should use or you need to make the name a part of the options class and remove it from here.

Otherwise, a user might not (for example) know to use "aggregate" when creating AggregateNodeOptions

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could indeed make the factory name unnecessary in the API, if we attach a factory name to the Python option classes. But I think I might opt to stay closer to the C++ API here to require a factory name. But indeed will mention in each option class docstring the name of its factory.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some basic explanation. We can always still improve that, or reconsider if the factory name is needed here in python in follow-up PRs.

"""
Run the declaration and collect the results into a table.

This method will implicitly add a sink node to the declaration
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do users care about the existence of a sink node? I suppose they might if they are creating custom nodes and aware of such things.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we will have multiple possible sinks at some point, it might be useful to be aware of the general pattern (and also for people that might read the C++ user guide alongside this, I think it is OK to be explicit here)

@jorisvandenbossche
Copy link
Copy Markdown
Member Author

jorisvandenbossche commented Feb 17, 2023

@westonpace thanks for the review! Question for the (useful) comments with ideas to clarify the docstrings: I mostly copied things from existing doc comments in the C++ code or explanations from the C++ user guide. While updating here based on your feedback, would it be useful to directly also update it in the C++ souce? (but maybe you prefer to keep the doc comments in options.h/exec_plan.h shorter?)

@westonpace
Copy link
Copy Markdown
Member

While updating here based on your feedback, would it be useful to directly also update it in the C++ source?

Yes, if you don't mind. The lengthy join explanation was mostly just because I was recently doing some Substrait work and so I had the detailed explanation still loaded up in short term memory. I think it would be quite helpful to add to the C++ documentation as well.

(but maybe you prefer to keep the doc comments in options.h/exec_plan.h shorter?)

I don't mind long doc comments when the situation warrants. 😆

@jorisvandenbossche
Copy link
Copy Markdown
Member Author

@westonpace I did some small updates to the doc comments / user guide based on your feedback, would appreciate if you could take a look at those changes.

Copy link
Copy Markdown
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks quite nice. A few more small thoughts.

from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true
from pyarrow.compute import field

# Initialize() # Initialise support for Datasets in ExecPlan
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncomment? (I don't remember if I asked about this last time, sorry)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will just remove for now, will do the Datasets scan/write node option classes in a follow-up PR.

c_names.push_back(<c_string>tobytes(name))

self.wrapped.reset(
new CProjectNodeOptions(c_expressions, c_names)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In retrospect, I think immutable is better. In python you can do MyOptions(x=7, y=3). So you don't really need the ability to do:

MyOptions options;
options.x = 7;
options.y = 3;

Comment on lines +319 to +323
left_output : list, optional
Output fields passed from left input. If left and right output
fields are not specified, all valid fields from both left and
right input will be output
right_output : list, optional
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these need to be lists of field references? Or can they be lists of strings?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they can be any form of field "reference", so either a field_ref or a string (or int). Will clarify in the docstring.

Comment on lines +358 to +360
factory_name : str
The ExecNode factory name, such as "table_source", "filter",
"project" etc.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the factory name is currently needed for the registry. Either you need to explain what it is and explain, in each of the options, what factory name they should use or you need to make the name a part of the options class and remove it from here.

Otherwise, a user might not (for example) know to use "aggregate" when creating AggregateNodeOptions

@github-actions github-actions bot added the awaiting merge Awaiting merge label Mar 2, 2023
Co-authored-by: Weston Pace <weston.pace@gmail.com>
@github-actions github-actions bot added awaiting changes Awaiting changes awaiting change review Awaiting change review and removed awaiting merge Awaiting merge awaiting changes Awaiting changes awaiting change review Awaiting change review labels Mar 3, 2023
@github-actions github-actions bot added awaiting change review Awaiting change review awaiting changes Awaiting changes and removed awaiting changes Awaiting changes awaiting change review Awaiting change review labels Mar 3, 2023
@jorisvandenbossche
Copy link
Copy Markdown
Member Author

The integration test failure seems unrelated and seems to happen on other PRs as well, so going to merge this.

@westonpace thanks for the review!

@jorisvandenbossche jorisvandenbossche added this to the 12.0.0 milestone Mar 3, 2023
@jorisvandenbossche jorisvandenbossche merged commit a3cd962 into apache:main Mar 3, 2023
@jorisvandenbossche jorisvandenbossche deleted the gh-33976-declaration-options branch March 3, 2023 12:49
@ursabot
Copy link
Copy Markdown

ursabot commented Mar 3, 2023

Benchmark runs are scheduled for baseline = 50fb5b0 and contender = a3cd962. a3cd962 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Finished ⬇️0.28% ⬆️0.06%] test-mac-arm
[Finished ⬇️0.0% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️1.33% ⬆️0.0%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] a3cd962b ec2-t3-xlarge-us-east-2
[Finished] a3cd962b test-mac-arm
[Finished] a3cd962b ursa-i9-9960x
[Finished] a3cd962b ursa-thinkcentre-m75q
[Finished] 50fb5b0d ec2-t3-xlarge-us-east-2
[Finished] 50fb5b0d test-mac-arm
[Finished] 50fb5b0d ursa-i9-9960x
[Finished] 50fb5b0d ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

jorisvandenbossche added a commit that referenced this pull request Mar 14, 2023
Continuing GH-34102, this adds the exec node options classes defined in the dataset module (scan, not yet write).

* Issue: #33976

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
jorisvandenbossche added a commit that referenced this pull request Mar 28, 2023
… Acero Declaration/ExecNodeOptions bindings (#34401)

This PR refactors our current custom cython implementation of the Table/Dataset.filter/join/group_by/sort_by methods to use the new bindings for Declaration/ExecNodeOptions (#34102).

* Issue: #33976

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
ArgusLi pushed a commit to Bit-Quill/arrow that referenced this pull request May 15, 2023
…se new Acero Declaration/ExecNodeOptions bindings (apache#34401)

This PR refactors our current custom cython implementation of the Table/Dataset.filter/join/group_by/sort_by methods to use the new bindings for Declaration/ExecNodeOptions (apache#34102).

* Issue: apache#33976

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants