GH-33976: [Python] Initial bindings for acero Declaration and ExecNodeOptions classes#34102
Conversation
…xecNodeOptions classes
|
|
python/pyarrow/_acero.pyx
Outdated
| cdef: | ||
| shared_ptr[CTable] c_table | ||
|
|
||
| c_table = GetResultValue(DeclarationToTable(self.decl, use_threads)) |
There was a problem hiding this comment.
Should I use DeclarationToTable or DeclarationToTableAsync here?
There was a problem hiding this comment.
Use DeclarationToTable. We have not historically exposed async methods to python and I don't see much benefit in doing so.
python/pyarrow/_acero.pyx
Outdated
| # try: | ||
| # c_join_type = self._join_type_map[join_type] | ||
| # except KeyError: | ||
| # raise ValueError("Unsupported join type") |
There was a problem hiding this comment.
I am getting compile errors with this more compact option, so wrote it out with if/elses below (but will still look into the errors)
|
Awesome stuff! I'm not too familiar with Acero, but looks like a good starting foundation for pyarrow. |
python/pyarrow/_acero.pxd
Outdated
|
|
||
| cdef const CExecNodeOptions* get_options(self) except NULL | ||
| cdef void init(self, const shared_ptr[CExecNodeOptions]& sp) | ||
| cdef inline shared_ptr[CExecNodeOptions] unwrap(self) |
There was a problem hiding this comment.
nogil? Or do you only add that when needed? I don't really know and am just looking at the difference between the two unwrap methods and hopefully learning something new :)
There was a problem hiding this comment.
Ah, that's a good reminder that I still need to check where to add nogil in general. I suppose for the options it is less critical, but certainly for executing with eg DeclarationToTable I should certainly put that in a nogil block.
python/pyarrow/_acero.pyx
Outdated
| cdef const CExecNodeOptions* get_options(self) except NULL: | ||
| return self.wrapped.get() |
There was a problem hiding this comment.
What is this for? Why use this over unwrap?
There was a problem hiding this comment.
This is the pattern we had for the compute options classes, so started with that. But in the end it's not actually being used here, so can remove it then.
python/pyarrow/_acero.pyx
Outdated
| # def __repr__(self): | ||
| # type_name = self.__class__.__name__ | ||
| # # Remove {} so we can use our own braces | ||
| # string_repr = frombytes(self.get_options().ToString())[1:-1] |
There was a problem hiding this comment.
Sadly, I don't think options are printable. Declarations can be printed with DeclarationToString. We could probably add ToString to the options objects if needed but I don't think that is typical (e.g. CSV options, etc.)
There was a problem hiding this comment.
Sorry, I should have added some comments to the WIP things. This was a reminder to see if we want a repr. But I agree that for the option classes here, that's not very important. Typically you only create them just to directly pass to a Declaration constructor (inspecting options created by someone else seems not a typical use case here)
python/pyarrow/_acero.pyx
Outdated
| # def __eq__(self, FunctionOptions other): | ||
| # return self.get_options().Equals(deref(other.get_options())) |
There was a problem hiding this comment.
Value equality of options objects (i.e. *self.unwrap() == *other.unwrap()) should generally be good enough I think.
There was a problem hiding this comment.
@westonpace the ExecNodeOptions class in C++ does not define a operator== overload, so I don't think it allows such equality check? (or where you thinking about pure pointer equality?)
Anyway, that's also not very important here (if we would like to add support for pickling those objects (and declarations / execplans in general, that might become more important, but that is definitely out of scope for this PR).
python/pyarrow/_acero.pyx
Outdated
| Make a node which excludes some rows from batches passed through it. | ||
|
|
||
| The "filter" operation provides an option to define data filtering | ||
| criteria. It selects rows matching a given expression. Filters can |
There was a problem hiding this comment.
"matching a given expression" is probably ok but it is a bit vague. Maybe "It selects rows where the given expression evaluates to true"? Also, we may want to clarify that the expression must have a return type of boolean.
python/pyarrow/_acero.pyx
Outdated
| The column name can be a string, an empty list or a list of | ||
| column names, for unary, nullary and n-ary aggregation functions | ||
| respectively. | ||
| keys : list, optional |
There was a problem hiding this comment.
A list of "field references", where each field reference can either be a string name, integer reference, or fieldref expression.
Updated the documentation and added some more test coverage for this.
Using integers (i.e. a FieldPath) does work out of the box, but given that this position is "relative" to the input of the node (i.e. output of the previous node) and not for the full plan, that's probably not something we want to encourage.
| join_type : str | ||
| Type of join. One of "left semi", "right semi", "left anti", | ||
| "right anti", "inner", "left outer", "right outer", "full outer". |
There was a problem hiding this comment.
Is there a default? I wonder if it makes sense to default to inner?
There was a problem hiding this comment.
I don't have a strong opinion about a default. I think it's fine for a low level interface to require to specify it. On the C++ side there is one simplified constructor signature that defaults to inner, the others are explicit.
|
|
||
| class HashJoinNodeOptions(_HashJoinNodeOptions): | ||
| """ | ||
| Make a node which implements join operation using hash join strategy. |
There was a problem hiding this comment.
This seems a bit sparse. Maybe...
Make a node which joins two inputs together based on common keys. Each row from the left input will be matched with rows from the right input where the keys are equal. Null keys do not match other null keys. If any key is null then that row will not match. Which columns are output, and the behavior of rows that do not match, will depend on the join type.
inner: for each row in the left input there will be one output row per matching row in the right input. If there are no matching rows in the right input then there will be no output row for that input row. The output columns will be the left_keys, followed by the right_keys (these will be identical to the left_keys), followed by the left_output, followed by the right_output.
left outer: behaves the same as an inner join. However, if there are no matching rows in the right input then there will be one output row that contains null for each column in right_keys and right_output. A row in the right input, that has no matching row in the left input, will not be output. The output columns will be identical to an inner join.
right outer: for each row in the right input there will be one output row per matching row in the left input. If there are no matching rows in the left input then there will be one output row containing null in left_keys and left_output. A row in the left input, that has no matching row in the right input, will not be output. The output columns will be identical to an inner join.
full outer: behaves the same as a left outer join. However, if there is a row in the right input, that has no matching row in the left input, then there will be one output row containing null for each column in left_keys and left_output. The output columns will be identical to an inner join.
left semi: for each row in the left input there will be exactly one output row, if and only if, there is a matching row in the right input. The output columns will be the left_keys followed by the left_output.
right semi: for each row in the right input there will be exactly one output row, if and only if, there is a matching row in the left input. The output columns will be the right_keys followed by the right_output.
left anti: for each row in the left input there will be exactly one output row, if and only if, there is not a matching row in the right input. The output will be the same as left semi.
right anti: for each row in the right input there will be exactly one output row, if and only if, there is not a matching row in the left input. The output will be the same as right semi.
There was a problem hiding this comment.
Given that this suggestion is a larger addition, going to leave that for a separate PR.
python/pyarrow/_acero.pyx
Outdated
| factory_name : str | ||
| The ExecNode factory name, such as "table_source", "filter", | ||
| "project" etc. |
There was a problem hiding this comment.
I think users may be confused by this since it might not be clear to them why a "factory name" is a necessary concept at all.
There was a problem hiding this comment.
And the factory name is needed for the registry? (so that you can register your own custom nodes)
Do you suggest using a different name here? (eg just "name" instead of "factory name") Or to explain it better what this "factory name" is?
There was a problem hiding this comment.
Yes, the factory name is currently needed for the registry. Either you need to explain what it is and explain, in each of the options, what factory name they should use or you need to make the name a part of the options class and remove it from here.
Otherwise, a user might not (for example) know to use "aggregate" when creating AggregateNodeOptions
There was a problem hiding this comment.
We could indeed make the factory name unnecessary in the API, if we attach a factory name to the Python option classes. But I think I might opt to stay closer to the C++ API here to require a factory name. But indeed will mention in each option class docstring the name of its factory.
There was a problem hiding this comment.
I added some basic explanation. We can always still improve that, or reconsider if the factory name is needed here in python in follow-up PRs.
| """ | ||
| Run the declaration and collect the results into a table. | ||
|
|
||
| This method will implicitly add a sink node to the declaration |
There was a problem hiding this comment.
Do users care about the existence of a sink node? I suppose they might if they are creating custom nodes and aware of such things.
There was a problem hiding this comment.
Given that we will have multiple possible sinks at some point, it might be useful to be aware of the general pattern (and also for people that might read the C++ user guide alongside this, I think it is OK to be explicit here)
|
@westonpace thanks for the review! Question for the (useful) comments with ideas to clarify the docstrings: I mostly copied things from existing doc comments in the C++ code or explanations from the C++ user guide. While updating here based on your feedback, would it be useful to directly also update it in the C++ souce? (but maybe you prefer to keep the doc comments in options.h/exec_plan.h shorter?) |
Yes, if you don't mind. The lengthy join explanation was mostly just because I was recently doing some Substrait work and so I had the detailed explanation still loaded up in short term memory. I think it would be quite helpful to add to the C++ documentation as well.
I don't mind long doc comments when the situation warrants. 😆 |
|
@westonpace I did some small updates to the doc comments / user guide based on your feedback, would appreciate if you could take a look at those changes. |
westonpace
left a comment
There was a problem hiding this comment.
This looks quite nice. A few more small thoughts.
python/pyarrow/_acero.pyx
Outdated
| from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true | ||
| from pyarrow.compute import field | ||
|
|
||
| # Initialize() # Initialise support for Datasets in ExecPlan |
There was a problem hiding this comment.
Uncomment? (I don't remember if I asked about this last time, sorry)
There was a problem hiding this comment.
Will just remove for now, will do the Datasets scan/write node option classes in a follow-up PR.
| c_names.push_back(<c_string>tobytes(name)) | ||
|
|
||
| self.wrapped.reset( | ||
| new CProjectNodeOptions(c_expressions, c_names) |
There was a problem hiding this comment.
In retrospect, I think immutable is better. In python you can do MyOptions(x=7, y=3). So you don't really need the ability to do:
MyOptions options;
options.x = 7;
options.y = 3;
| left_output : list, optional | ||
| Output fields passed from left input. If left and right output | ||
| fields are not specified, all valid fields from both left and | ||
| right input will be output | ||
| right_output : list, optional |
There was a problem hiding this comment.
Do these need to be lists of field references? Or can they be lists of strings?
There was a problem hiding this comment.
Yes, they can be any form of field "reference", so either a field_ref or a string (or int). Will clarify in the docstring.
python/pyarrow/_acero.pyx
Outdated
| factory_name : str | ||
| The ExecNode factory name, such as "table_source", "filter", | ||
| "project" etc. |
There was a problem hiding this comment.
Yes, the factory name is currently needed for the registry. Either you need to explain what it is and explain, in each of the options, what factory name they should use or you need to make the name a part of the options class and remove it from here.
Otherwise, a user might not (for example) know to use "aggregate" when creating AggregateNodeOptions
Co-authored-by: Weston Pace <weston.pace@gmail.com>
|
The integration test failure seems unrelated and seems to happen on other PRs as well, so going to merge this. @westonpace thanks for the review! |
|
Benchmark runs are scheduled for baseline = 50fb5b0 and contender = a3cd962. a3cd962 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
… Acero Declaration/ExecNodeOptions bindings (#34401) This PR refactors our current custom cython implementation of the Table/Dataset.filter/join/group_by/sort_by methods to use the new bindings for Declaration/ExecNodeOptions (#34102). * Issue: #33976 Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
…se new Acero Declaration/ExecNodeOptions bindings (apache#34401) This PR refactors our current custom cython implementation of the Table/Dataset.filter/join/group_by/sort_by methods to use the new bindings for Declaration/ExecNodeOptions (apache#34102). * Issue: apache#33976 Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
First step for GH-33976, adding basic bindings for the different ExecNodeOptions classes and the Declaration class to combine those in a query.
Some notes on what is and what is not included in this PR:
SourceNodeOptionset al, only the concreteTableSourceNodeOptions(should probably also addRecordBatchReaderSourceNodeOptions)Declaration.to_table(), and given that there is currently no explicit API to manually convert to ExecPlan and execute it, explicit table sink node bindings didn't seem necessary.Declarationwith ato_tablemethod to execute the plan and consume it into a Table, and ato_reader()to get a RecordBatchReader (could also further add ato_batches()method)--