Skip to content

Conversation

@guojidan
Copy link
Contributor

@guojidan guojidan commented Jan 17, 2024

Which issue does this PR close?

Closes #8793 .

What changes are included in this PR?

Support GroupsAccumulator accumulator for udaf

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates labels Jan 17, 2024
@guojidan guojidan marked this pull request as draft January 17, 2024 09:16
@guojidan guojidan marked this pull request as ready for review January 17, 2024 15:52
@guojidan
Copy link
Contributor Author

cc @alamb

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much @guojidan -- this is a very nice PR as it follows the pattern of the existing APIs and is really nicely written/ Bravo 👏

Prior to merge, I think this PR needs:

  1. Hook up the new trait to AggregateExpr (I left comments about what I think is needed inline) as I don't think it quite works yet
  2. A test that shows the GroupsAccumulator being called (see below)

I had a few comments about how we could potentially improve the example with comments and simplification, but we could do that as a follow on PR

Testing

For testing, I suggest a test that all the plumbing is hooked up correctly (which I don't think it is yet in this PR)

For example,

  1. Add a new AggregateUDF here: https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/user_defined/user_defined_aggregates.rs
  2. The new AggregateUDF returns a basic GroupsAccumulator (can simply return a constant or something -- it doesn't have to have any logic)
  3. The new AggregateUDF should error / panic if the normal Accumulator path is invoked
  4. Run a query that shows the Accumulator is invoked not the Accumulator

/// If the aggregate expression has a specialized
/// [`GroupsAccumulator`] implementation. If this returns true,
/// `[Self::create_groups_accumulator`] will be called.
fn groups_accumulator_supported(&self) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have missed it, but I don't see this function being called anywhere

I think we need to implement AggregateExpr::groups_accumulator_supported and groups_accumulator_supported for the implementation of AggregateExpr (different trait) for AggregateUDF, here :

https://github.com/apache/arrow-datafusion/blob/7f111257443d79259eacbe3cb2ace1bdd276e5fc/datafusion/physical-plan/src/udaf.rs#L75-L74

fn state_type(&self, _return_type: &DataType) -> Result<Vec<DataType>> {
Ok(vec![DataType::Float64, DataType::UInt32])
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend we add a note to accumulator() above about when this is used. Now that I write this maybe we should also put some of this information on the docstrings for AggregateUDF::groups_accumulator

-    /// This is the accumulator factory; DataFusion uses it to create new accumulators.
+   /// This is the accumulator factory for row wise accumulation; Even when `GroupsAccumulator`
+   /// is supported, DataFusion will use this row oriented
+   /// accumulator when the aggregate function is used as a window function
+   /// or when there are only aggregates (no GROUP BY columns) in the plan.

Ok(vec![DataType::Float64, DataType::UInt32])
}

fn groups_accumulator_supported(&self) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to add some context annotating this function for the example:

Suggested change
fn groups_accumulator_supported(&self) -> bool {
/// Tell DataFusion that this aggregate supports the more performant `GroupsAccumulator`
/// which is used for cases when there are grouping columns in the query
fn groups_accumulator_supported(&self) -> bool {

Ok(ctx)
}

struct GeometricMeanGroupsAccumulator<F>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
struct GeometricMeanGroupsAccumulator<F>
/// Define a `GroupsAccumulator` for GeometricMean
/// which handles accumulator state for multiple groups at once.
/// This API is significantly more complicated than `Accumulator`, which manages
/// the state for a single group, but for queries with a large number of groups
/// can be significantly faster. See the `GroupsAccumulator` documentation for
/// more information.
struct GeometricMeanGroupsAccumulator<F>

/// Count per group (use u64 to make UInt64Array)
counts: Vec<u64>,

/// product per group, stored as the native type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// product per group, stored as the native type
/// product per group, stored as the native type (not `ScalarValue`)

) -> Result<<Float64Type as ArrowPrimitiveType>::Native>
+ Send,
{
fn update_batch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn update_batch(
/// Updates the accumulator state given input. DataFusion provides `group_indices`, the groups that each
/// row in `values` belongs to as well as an optional filter of which rows passed.
fn update_batch(

self.counts.resize(total_num_groups, 0);
self.prods
.resize(total_num_groups, Float64Type::default_value());
self.null_state.accumulate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.null_state.accumulate(
/// Use the `NullState` structure to generate specialized code for null / non null input elements
self.null_state.accumulate(

Ok(())
}

fn merge_batch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn merge_batch(
/// Merge the results from previous invocations of `evaluate` into this accumulator's state
fn merge_batch(

Ok(())
}

fn evaluate(&mut self, emit_to: datafusion_expr::EmitTo) -> Result<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn evaluate(&mut self, emit_to: datafusion_expr::EmitTo) -> Result<ArrayRef> {
/// Generate output, as specififed by `emit_to` and update the intermediate state
fn evaluate(&mut self, emit_to: datafusion_expr::EmitTo) -> Result<ArrayRef> {

null_state: NullState,

/// Function that computes the final geometric mean (value / count)
geo_mean_fn: F,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the example would be simpler if you removed the generics and simply inlined the definition of geo_mean_fn into the callsite in evaluate. The generics are needed for GroupsAccumulators that are specialized on type (e.g. a special one for Float32, Float64, etc).

@alamb
Copy link
Contributor

alamb commented Jan 21, 2024

@guojidan -- thanks again for your work on this PR. I am quite interested in getting this PR merged, so if you don't think you'll have a chance to work on it this week I would be happy to try and help as well

@github-actions github-actions bot added the core Core DataFusion crate label Jan 22, 2024
@guojidan
Copy link
Contributor Author

sorry, I have been very busy the other day, but I am come back 😄

@alamb
Copy link
Contributor

alamb commented Jan 22, 2024

sorry, I have been very busy the other day, but I am come back 😄

No worries -- thank you for working on this

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @guojidan -- this looks great


fn accumulator(&self, _arg: &DataType) -> Result<Box<dyn Accumulator>> {
// should use groups accumulator
panic!("accumulator shouldn't invoke");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support GroupsAccumulator accumulator for User Defined Functions

2 participants