-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-4124: [C++] Draft Aggregate and Sum kernels #3407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-4124: [C++] Draft Aggregate and Sum kernels #3407
Conversation
cpp/src/arrow/compute/kernel.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't you just static_cast<Type::type>(this->value.which())?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd have to make sure the mapping is exact, which is almost equivalent to expliciting this list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be undefed below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No since they're "exported" macros in other files.
emkornfield
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not an expert in this space so take my comments with a grain of salt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, you would want to pass this through as a unique_ptr to make the ownership transfer explicit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like a slightly different pattern then was taken for UnaryArrayKernel (i.e. this pattern wasn't in the kernel, but instead in util-internal.h). I haven't thought through if a similar pattern should try to be established, have you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see util-internal.h, I'll try to merge both. When I used this piece, I would have preferred to not be concerned with this and hoped that Datum provided a ChunkedArray for both cases (wrapping in a single element ChunkedArray when datum is Array).
I'll add this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is only true for Monoids?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have an example of an aggregate function which is not associative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short answer (might be contrived): "exponentially smoothed moving average" and "hash elements"
Longer answer (probably obvious and too pedantic): I was assuming a naive parallelization where Consume could be called in any order which would require Commutativity and associativity. With non-commutative associative operations (e.g. string concatenation, FIRST, LAST, etc ) there are more book-keeping details to to ensure parallelization still returns the correct results. At the moment I'm having a hard time coming up with an example that benefits from parallelization of onsume that is not commutative.
From a practical standpoint for this review:
- It might be good is add some additional pseudo-code for the parallelism AggregateState is meant to handle,
- Possibly having some mechanism to signal commutative and non-commutative state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To answer my own question about associate and not commutative multithreading, the parsing of CSV files is a good example where multithreading makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are compilers generally good enough to optimize out any overhead from calling the constructor (I would assume so), or would it make sense to have have a slightly less clean Monoid abstraction that takes the raw ValueType?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compiler is clever enough (at least with O3). But, we can also add a second operator+= that takes the scalar value. When I compile locally with -march=native, this is the hotloop as reported by perf.
10.96 │ vpaddq (%rax,%rdi,8),%zmm0,%zmm0
6.73 │ vpaddq 0x40(%rax,%rdi,8),%zmm1,%zmm1
5.26 │ vpaddq 0x80(%rax,%rdi,8),%zmm2,%zmm2
5.40 │ vpaddq 0xc0(%rax,%rdi,8),%zmm3,%zmm3
9.53 │ vpaddq 0x100(%rax,%rdi,8),%zmm0,%zmm0
5.78 │ vpaddq 0x140(%rax,%rdi,8),%zmm1,%zmm1
5.06 │ vpaddq 0x180(%rax,%rdi,8),%zmm2,%zmm2
5.06 │ vpaddq 0x1c0(%rax,%rdi,8),%zmm3,%zmm3
8.38 │ vpaddq 0x200(%rax,%rdi,8),%zmm0,%zmm0
5.53 │ vpaddq 0x240(%rax,%rdi,8),%zmm1,%zmm1
4.97 │ vpaddq 0x280(%rax,%rdi,8),%zmm2,%zmm2
4.84 │ vpaddq 0x2c0(%rax,%rdi,8),%zmm3,%zmm3
7.46 │ vpaddq 0x300(%rax,%rdi,8),%zmm0,%zmm0
5.39 │ vpaddq 0x340(%rax,%rdi,8),%zmm1,%zmm1
5.07 │ vpaddq 0x380(%rax,%rdi,8),%zmm2,%zmm2
4.34 │ vpaddq 0x3c0(%rax,%rdi,8),%zmm3,%zmm3
0.24 │ sub $0xffffffffffffff80,%rdi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also note that some Monoid will be composed and aren't made of a single scalar value, e.g. the Mean monoid is made of std::tuple<size_t, T>.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 cool, its nice to see the compiler can auto-vectorize this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if the discussion happened already but it would be good to understand the general plan around threading model, and how this relates the physical execution layer (i.e. something like the volcano model) and what exactly should live in which level (i.e. what belongs in the kernel/state).
I'm sure you considered this, but a different design might create a kernel per thread, then merge each scalar at the end if the work can in fact can be run in parallel. Probably not much difference performance wise but it still might eliminate an unnecessary memory barrier/system call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created ARROW-4333 to track this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also am not sure it's a good idea to lock inside Consume
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do monoids always have to be a scalar? Would it make sense to have monoid_ populate the datum instead of assuming this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a subsequent change the MonoidState will also be parametrized with a finalizer, which is a functor that materialize to a Datum. Mean and Hyperloglog are example that needs this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: CheckSum is a little bit confusing name due to checksum being a potential operation. Something like VerifySum might read a little better.
|
I will take a closer look, but one comment: thread control / multithreading should be handled (IMHO) at a higher level than the aggregation kernels. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be nice to have a benchmark that includes some nulls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See some benchmarking I did about this http://wesmckinney.com/blog/bitmaps-vs-sentinel-values/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@emkornfield I will add both, my first benchmark was with nulls, but I removed them to to benchmark with perf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you post a performance comparison with the hand-optimized loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one interesting case is all nulls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and it's already an issue with the current implementation, e.g. the min/max monoid would return std::numeric_limits<T>::max/min respectively and that is an issue; it should return NULL.
cpp/src/arrow/compute/kernel.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to include string and unicode types in here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed ARROW-4331 to track adding additional types.
|
Will review soon |
680d60c to
7fb9282
Compare
|
As @emkornfield proposed, let's move threading/architecture of execution in https://issues.apache.org/jira/browse/ARROW-4333. |
wesm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, this implementation has branching in the inner loop, so the approach here might have to be rethought. The performance difference by removing branching and partially unrolling the inner loop is huge. See http://wesmckinney.com/blog/bitmaps-vs-sentinel-values/
cpp/src/arrow/compute/kernel.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. I'm not sure about this. See https://issues.apache.org/jira/browse/ARROW-47. It's OK to leave this as is but marked experimental
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this file feel like a kludge to me. Let's not do anything else involving scalar outputs until resolving ARROW-47. I'll put up a patch for that tomorrow with any luck
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm skeptical about whether this is going to generate good code when we don't compile with -march=native (because we can't do this in many of the binaries we ship...). Compare with some analysis I did of optimizing "sum" in http://wesmckinney.com/blog/bitmaps-vs-sentinel-values/ -- if we aren't coming within 5% of hand-optimized code with these abstractions, we need to go back to the drawing board
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also am not sure it's a good idea to lock inside Consume
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you post a performance comparison with the hand-optimized loop?
|
Here's the hand-tuned implementation I'm referring to https://github.com/wesm/bitmaps-vs-sentinels/blob/master/benchmark.cc#L227 |
|
Another question that needs to be resolved is how to return a NULL value. What we do in pandas is have a "min_values" option, so:
|
8fe6cd3 to
d91002d
Compare
|
@fsaintjacques the PR description is out of date, can you update? I'm going to review and leave comments so we can merge this and move on to the next steps |
78f9dbf to
9bd1f25
Compare
Change-Id: Iacd7e6bdd6ce03cff3003ea1f8e703ba42c3ab7a
wesm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments for follow up patches. I'm going to tweak a few things and then merge this once the build is passing
cpp/src/arrow/compute/kernel.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this file feel like a kludge to me. Let's not do anything else involving scalar outputs until resolving ARROW-47. I'll put up a patch for that tomorrow with any luck
|
|
||
| # Aggregates | ||
| ADD_ARROW_TEST(sum-test PREFIX "arrow-compute") | ||
| ADD_ARROW_BENCHMARK(sum-benchmark PREFIX "arrow-compute") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's plan to test and benchmark all the aggregates in one executable each (one for unit test, one for benchmark). So these names will have to change. Let's change them now so we don't have to move the file later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍, such that we amortize the slow linking issue with windows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
among other things, yeah. I don't think we should have one test executable for each kernel, but rather to organize tests according to logical relationships (e.g. "mean" is close to "sum")
| static std::shared_ptr<ManagedAggregateState> Make( | ||
| std::shared_ptr<AggregateFunction>& desc, MemoryPool* pool) { | ||
| std::shared_ptr<Buffer> buf; | ||
| if (!AllocateBuffer(pool, desc->Size(), &buf).ok()) return nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bad code smell
| class AggregateFunction { | ||
| public: | ||
| /// \brief Consume an array into a state. | ||
| virtual Status Consume(const Array& input, void* state) const = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to accept ArrayData here and about unnecessary boxing / unboxing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent was to have a strongly typed parameters and let the error handling on the caller.
| auto state = ManagedAggregateState::Make(aggregate_function_, ctx->memory_pool()); | ||
| if (!state) return Status::OutOfMemory("AggregateState allocation failed"); | ||
|
|
||
| auto array = input.make_array(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See below comments re: this boxing step which could be avoided
| ValidateSum<TypeParam>(&this->ctx_, "[]", Datum()); | ||
|
|
||
| ValidateSum<TypeParam>(&this->ctx_, "[0, 1, 2, 3, 4, 5]", | ||
| Datum(Scalar(static_cast<SumType>(5 * 6 / 2)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have a test for the "all nulls" case. The output can be preliminary (i.e. returning 0)
| if (input.null_count() > 0) | ||
| *state = ConsumeSparse(array); | ||
| else | ||
| *state = ConsumeDense(array); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
braces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| StateType local; | ||
|
|
||
| // TODO(fsaintjacques): This fails on slice not byte-aligned. | ||
| DCHECK_EQ(array.offset() % 8, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create follow up JIRA?
| const auto values = array.raw_values(); | ||
| const auto bitmap = array.null_bitmap_data() + BitUtil::RoundDown(array.offset(), 8); | ||
| const auto length = array.length(); | ||
| const auto length_rounded = BitUtil::RoundDown(length, 8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should write down explicit types instead of const auto
| // Returns 'value' rounded down to the nearest multiple of 'factor' | ||
| constexpr int64_t RoundDown(int64_t value, int64_t factor) { | ||
| return (value / factor) * factor; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add unit tests for this
Change-Id: I9a1f46f10c30d6f740c7896a2c72195013a714e7
Change-Id: I7f5d047191d58369f6e73650f3d705e967a2ba7b
Change-Id: Ib638796e4bb7c456625b8c29ba803f071051fd66
|
@fsaintjacques I tweaked the PR description. I'll merge this once the build passes in the interest of project velocity. There's a lot of follow up issues to create so I'll leave that to you |
Codecov Report
@@ Coverage Diff @@
## master #3407 +/- ##
==========================================
+ Coverage 87.75% 88.62% +0.87%
==========================================
Files 667 542 -125
Lines 82541 73793 -8748
Branches 1069 0 -1069
==========================================
- Hits 72434 65400 -7034
+ Misses 9996 8393 -1603
+ Partials 111 0 -111
Continue to review full report at Codecov.
|
|
The benchmark results on my machine for posterity: The L1-cache-sized benchmarks seem like they can be a bit noisy |
| const auto length = array.length(); | ||
| for (int64_t i = 0; i < length; i++) { | ||
| // NaN is not equal to itself | ||
| local.total += values[i] * Traits<T>::NotNull(values[i]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous version was faster, clang-6 and onward auto-vectorize this perfectly (even with just O3): https://godbolt.org/z/v5nFNM . The result would be even better with AVX512 enabled (see https://godbolt.org/z/LFcQJv and https://llvm.org/devmtg/2015-04/slides/MaskedIntrinsics.pdf).
…ue frequencies Picked up from: #1970 I mostly reused the unit tests as is and modified the rest based on feedback on that PR and adapting to new code. Some other changes I mae: 1. Remove the HashKernel and getter from the public API. I think we can add these back once we have a better idea on how we are doing stateful kernels (i.e. #3407) 2. Add operator[] to NumericBuilder to modify previously added values (this might not be the right place, and I had a little bit of trouble figuring out how to integrate this into the existing TypedTest so the testing is a little bit weak). This seemed better then using a vector to collect values. If this approach looks OK for now. I'm also going to open up a JIRA to try refactor the unittest for Hash kernels (and maybe the headers) since I think there might be a clearer more granular way of laying these out. other things to discuss: 1. Handling null values in Count/Unique (it looks like they are dropped today, should this configurable/turned on). 2. Hashing edge cases for floating point numbers (just opened a JIRA on this). Author: Micah Kornfield <emkornfield@gmail.com> Author: Alessandro Andrioni <alessandroandrioni@gmail.com> Closes #3579 from emkornfield/count_kernel and squashes the following commits: 9c55f7b <Micah Kornfield> make 64 bit dd0d8a1 <Micah Kornfield> fix link and warning 72095eb <Micah Kornfield> Templatize whether to use return status 54afb2b <Micah Kornfield> change from std::copy to memcopy 2973fcc <Micah Kornfield> Address code review comments e7e624b <Micah Kornfield> Add constants, per code review 4770d99 <Micah Kornfield> fix warning c6f6ad7 <Micah Kornfield> address feedback d99e52f <Micah Kornfield> add guard to CopyValue in cases where vector is empty 8c26b01 <Micah Kornfield> fix format b7d5492 <Micah Kornfield> add null test f964bd6 <Micah Kornfield> Rebase e8e58a5 <Micah Kornfield> Address output type code review feedback defb4f1 <Micah Kornfield> remove export from .cc 0152f2f <Micah Kornfield> plumb through status on hash visitors afeb1ad <Micah Kornfield> add real jira 96858bd <Micah Kornfield> Use macro inversion to reduce boiler plate 0dd0077 <Micah Kornfield> minimal test 57349f7 <Micah Kornfield> unit tests passing 34834f7 <Alessandro Andrioni> First try at implementing a CountValues kernel
…ue frequencies Picked up from: apache/arrow#1970 I mostly reused the unit tests as is and modified the rest based on feedback on that PR and adapting to new code. Some other changes I mae: 1. Remove the HashKernel and getter from the public API. I think we can add these back once we have a better idea on how we are doing stateful kernels (i.e. apache/arrow#3407) 2. Add operator[] to NumericBuilder to modify previously added values (this might not be the right place, and I had a little bit of trouble figuring out how to integrate this into the existing TypedTest so the testing is a little bit weak). This seemed better then using a vector to collect values. If this approach looks OK for now. I'm also going to open up a JIRA to try refactor the unittest for Hash kernels (and maybe the headers) since I think there might be a clearer more granular way of laying these out. other things to discuss: 1. Handling null values in Count/Unique (it looks like they are dropped today, should this configurable/turned on). 2. Hashing edge cases for floating point numbers (just opened a JIRA on this). Author: Micah Kornfield <emkornfield@gmail.com> Author: Alessandro Andrioni <alessandroandrioni@gmail.com> Closes #3579 from emkornfield/count_kernel and squashes the following commits: 9c55f7ba6 <Micah Kornfield> make 64 bit dd0d8a155 <Micah Kornfield> fix link and warning 72095ebc4 <Micah Kornfield> Templatize whether to use return status 54afb2bac <Micah Kornfield> change from std::copy to memcopy 2973fccbe <Micah Kornfield> Address code review comments e7e624b1f <Micah Kornfield> Add constants, per code review 4770d9924 <Micah Kornfield> fix warning c6f6ad72f <Micah Kornfield> address feedback d99e52fb7 <Micah Kornfield> add guard to CopyValue in cases where vector is empty 8c26b0154 <Micah Kornfield> fix format b7d54929a <Micah Kornfield> add null test f964bd6da <Micah Kornfield> Rebase e8e58a5b9 <Micah Kornfield> Address output type code review feedback defb4f1a1 <Micah Kornfield> remove export from .cc 0152f2fa5 <Micah Kornfield> plumb through status on hash visitors afeb1ad04 <Micah Kornfield> add real jira 96858bd52 <Micah Kornfield> Use macro inversion to reduce boiler plate 0dd007718 <Micah Kornfield> minimal test 57349f7ea <Micah Kornfield> unit tests passing 34834f711 <Alessandro Andrioni> First try at implementing a CountValues kernel
This is a draft of the aggregate kernel interface. The goal of publishing this PR is to gather feedback on the design and architecture.
AggregateUnaryKernel/AggregateStatedecomposition kernel implementation and parallel execution. The implementor of anAggregateStatedoes not need to be concerned about details of parallel execution (minus the thread-safety of theConsumemethod). It also allows user to implement custom aggregates.Monoida class representing a mathematical monoid.MonoidAggregateStatea specific implementation ofAggregateStatethat is generic enough to support various monoids. Note that it is limited to monoid known at compile time, usually monoids on primitives.Currently only the "Sum" kernel is implemented. Further kernels will be implemented in follow up patches.
The current draft does not take into account filtering and groups. I do have an idea of how this would be done, e.g. add
AggregateState::Consume(..., const Mask &filter). I delayed incorporating this interface until we implement the filtering kernel. This is going to be done in a different ticket.