colexec: add optimized versions of aggregate window functions#68212
colexec: add optimized versions of aggregate window functions#68212craig[bot] merged 7 commits intocockroachdb:masterfrom
Conversation
6c4fda5 to
941be9a
Compare
c792aa1 to
022a4db
Compare
|
TODO: squash the commits into one before merging. |
022a4db to
5ad6a51
Compare
yuzefovich
left a comment
There was a problem hiding this comment.
Very nice work! Mostly minor and nit comments from me.
Reviewed 3 of 3 files at r1, 8 of 8 files at r2, 10 of 10 files at r3, 4 of 4 files at r4, 5 of 5 files at r5, 3 of 3 files at r6, 12 of 17 files at r7, 1 of 28 files at r8, 4 of 4 files at r9, 5 of 5 files at r10, 1 of 3 files at r11, 17 of 17 files at r12, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball)
pkg/sql/colexec/colexecagg/avg_agg_tmpl.go, line 104 at r12 (raw file):
// curCount keeps track of the number of elements that we've seen // belonging to the current group. curCount int64
Hm, I think for avg in particular curCount is the same as numNonNull, so we can actually remove the latter altogether.
pkg/sql/colexec/colexecagg/count_agg_tmpl.go, line 180 at r12 (raw file):
vecs []coldata.Vec, inputIdxs []uint32, startIdx, endIdx int, ) { execgen.SETVARIABLESIZE(oldCurAggSize, a.curAgg)
nit: I missed this originally when I added this accounting, but in case of count and count_rows the curAgg is always int64, so there is no point in using SETVARIABLESIZE, and we should just remove it.
pkg/sql/colexec/colexecagg/count_agg_tmpl.go, line 261 at r12 (raw file):
// {{/* // _REMOVE_ROW removes the value of the ith row from the output for the // current aggregation. If this is the first row of the current aggregation, and
ditto
pkg/sql/colexec/colexecagg/min_max_agg_tmpl.go, line 205 at r12 (raw file):
a.col.Set(outputIdx, a.curAgg) } // {{if and (not (eq "_AGGKIND" "Window")) (or (.IsBytesLike) (eq .VecMethod "Datum"))}}
Why did this change?
pkg/sql/colexec/colexecagg/sum_agg_tmpl.go, line 98 at r3 (raw file):
// numNonNull tracks the number of non-null values we have seen for the group // that is currently being aggregated. numNonNull uint64
Hm, I'm wondering whether using uint64 here all the time is a bit unfortunate for the case of the hash aggregation. There, we might have the number of buckets on the order of the number of input rows, and all of these buckets will be kept in memory. By switching from bool to uint64 we might bump the struct into a larger struct size allocation-wise.
I'm thinking maybe we should use byte for ordered and hash aggregates and uint64 for the window - seems like we would only need to change the struct definition here. WDYT?
pkg/sql/colexec/colexecagg/sum_agg_tmpl.go, line 348 at r3 (raw file):
// {{/* // _REMOVE_ROW removes the value of the ith row from the output for the // current aggregation. If this is the first row of the current aggregation, and
nit: the second sentence seems incorrect/misleading.
pkg/sql/colexec/colexecwindow/count_rows_aggregator.go, line 74 at r12 (raw file):
// Init implements the bufferedWindower interface. func (a *countRowsWindowAggregator) Init(ctx context.Context) { if !a.InitHelper.Init(ctx) {
super nit: we can drop if.
pkg/sql/colexec/colexecwindow/min_max_queue.go, line 150 at r12 (raw file):
return true } if q.len() != cap(q.buffer) {
super nit: I think this condition is more likely to be true than the one above, so I would switch them.
pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go, line 80 at r12 (raw file):
framer windowFramer // A partial dequeue of indices into the current partition ordered by the
nit: I think folks usually uses deque to dequeue for double-ended queue.
pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go, line 101 at r12 (raw file):
// Init implements the bufferedWindower interface. func (b *minMaxRemovableAggBase) Init(ctx context.Context) { if !b.InitHelper.Init(ctx) {
ditto
pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go, line 63 at r2 (raw file):
// buffer half of the memory budget even though it will generally store less // columns than the queue. bufferMemLimit := int64(float64(args.MemoryLimit) * 0.5)
I can't seem to find where the second half of the memory limit is used.
pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go, line 106 at r12 (raw file):
windower = newMaxRemovableAggregator(args, framer, buffer, outputType) } if err := closers.Close(); err != nil {
This seems suspicious.
pkg/sql/colexec/colexecwindow/window_framer_tmpl.go, line 525 at r1 (raw file):
// rows that are a part of the current window frame, but not the previous one, // and rows that were a part of the previous window frame, but not the current // one. Note that getSlidingWindowIntervals appends to the provided 'toAdd' and
nit: it might be a bit cleaner to remove this from the contract and simply do toAdd, toRemove = toAdd[:0], toRemove[:0] in the very beginning. I think it should still work if those are nil.
pkg/sql/colexec/colexecwindow/window_framer_tmpl.go, line 528 at r1 (raw file):
// 'toRemove' slices, so they should be passed in with zero length. func getSlidingWindowIntervals( currIntervals, prevIntervals, toAdd, toRemove []windowInterval,
nit: I would also explicitly mention the ordering guarantees imposed on the input slices and provided for the output slices.
pkg/sql/colexec/colexecwindow/window_framer_tmpl.go, line 533 at r1 (raw file):
var prev, curr windowInterval setPrev, setCurr := true, true for {
nit: a general comment on what (and a bit about how) this loop accomplishes would be helpful.
5ad6a51 to
a088567
Compare
DrewKimball
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)
pkg/sql/colexec/colexecagg/avg_agg_tmpl.go, line 104 at r12 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Hm, I think for
avgin particularcurCountis the same asnumNonNull, so we can actually remove the latter altogether.
Huh, that's clever. Done.
pkg/sql/colexec/colexecagg/count_agg_tmpl.go, line 180 at r12 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: I missed this originally when I added this accounting, but in case of
countandcount_rowsthecurAggis alwaysint64, so there is no point in usingSETVARIABLESIZE, and we should just remove it.
Done.
pkg/sql/colexec/colexecagg/count_agg_tmpl.go, line 261 at r12 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
ditto
Done.
pkg/sql/colexec/colexecagg/min_max_agg_tmpl.go, line 205 at r12 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Why did this change?
The aggregation gets reused between frames for the window strategy, so we can't nil out the aggregation. I'll add a comment saying so.
pkg/sql/colexec/colexecagg/sum_agg_tmpl.go, line 98 at r3 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Hm, I'm wondering whether using
uint64here all the time is a bit unfortunate for the case of the hash aggregation. There, we might have the number of buckets on the order of the number of input rows, and all of these buckets will be kept in memory. By switching frombooltouint64we might bump the struct into a larger struct size allocation-wise.I'm thinking maybe we should use
bytefor ordered and hash aggregates anduint64for the window - seems like we would only need to change the struct definition here. WDYT?
I'm not sure it's actually a problem, since structs get padded out to be a multiple of 8 bytes, so the struct has 1 byte for the bool and then 7 for padding: https://play.golang.org/p/xC47ejwQLym
But it's hard to be sure that this will always be the case, and if any fields get added in the future it could change things. So, I'll add it if you still think it's a good idea
pkg/sql/colexec/colexecagg/sum_agg_tmpl.go, line 348 at r3 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: the second sentence seems incorrect/misleading.
Done.
pkg/sql/colexec/colexecwindow/count_rows_aggregator.go, line 74 at r12 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
super nit: we can drop
if.
Done.
pkg/sql/colexec/colexecwindow/min_max_queue.go, line 150 at r12 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
super nit: I think this condition is more likely to be
truethan the one above, so I would switch them.
Done.
pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go, line 80 at r12 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: I think folks usually uses
dequetodequeuefor double-ended queue.
Done.
pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go, line 101 at r12 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
ditto
Done.
pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go, line 63 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I can't seem to find where the second half of the memory limit is used.
Hm, the bufferedWindower gets it from the args field, but that's actually a problem because it isn't adjusted for the reserved buffer memory, so we'd end up going over the limit. I guess I introduced this bug during some refactoring in the last PR. Fixed it (and also made the limit-passing more explicit).
pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go, line 106 at r12 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
This seems suspicious.
That it does. Removed the unnecessary AggregateFunc construction for the case when min and max are specialized.
pkg/sql/colexec/colexecwindow/window_framer_tmpl.go, line 525 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: it might be a bit cleaner to remove this from the contract and simply do
toAdd, toRemove = toAdd[:0], toRemove[:0]in the very beginning. I think it should still work if those arenil.
Done.
pkg/sql/colexec/colexecwindow/window_framer_tmpl.go, line 528 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: I would also explicitly mention the ordering guarantees imposed on the input slices and provided for the output slices.
Done.
pkg/sql/colexec/colexecwindow/window_framer_tmpl.go, line 533 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: a general comment on what (and a bit about how) this loop accomplishes would be helpful.
Done.
yuzefovich
left a comment
There was a problem hiding this comment.
Reviewed 2 of 40 files at r13, 12 of 20 files at r14, 9 of 13 files at r15, 6 of 6 files at r16, 5 of 5 files at r17, 1 of 4 files at r18, 18 of 18 files at r19, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @DrewKimball)
pkg/sql/colexec/colexecagg/sum_agg_tmpl.go, line 98 at r3 (raw file):
Previously, DrewKimball (Drew Kimball) wrote…
I'm not sure it's actually a problem, since structs get padded out to be a multiple of 8 bytes, so the struct has 1 byte for the bool and then 7 for padding: https://play.golang.org/p/xC47ejwQLym
But it's hard to be sure that this will always be the case, and if any fields get added in the future it could change things. So, I'll add it if you still think it's a good idea
Yeah, good point. This would matter if we had multiple single-byte fields. Never mind.
pkg/sql/colexec/colexecwindow/window_framer_tmpl.go, line 540 at r19 (raw file):
// this, take advantage of the fact that both sets of intervals are in // ascending order, similar to merging sorted lists. Maintain indices into // each list, and iterate whichever index has the 'smaller' interval (e.g.)
super nit: the closing parenthesis probably should be after "ends first".
This commit adds a method `slidingWindowIntervals` to `windowFramer` operators that returns a set of `toAdd` intervals and a set of `toRemove` intervals, which indicate the rows that should be added to the current aggregation and those that should be removed, respectively. This will be used to implement the sliding window optimization for aggregate window functions such as `sum`. Release note: None
This commit supplies a new operator, `slidingWindowAggregator`, which is used for any window aggregate functions that implement the `slidingWindowAggregateFunc` interface. Rather than aggregating over the entire window frame for each row, the `slidingWindowAggregator` operator aggregates over the rows that are in the current window frame but were not in the previous, and removes from the aggregation the rows that were in the previous window frame but not the current. This allows window aggregate functions to be evaluated in linear rather than quadratic time. Release note: None
This commit modifies the `sum` aggregate window function to implement the `slidingWindowAggregateFunc` interface, which allows it to be used in a sliding window context. This yields linear rather than quadratic scaling in the worst case, and allows the vectorized engine to meet or exceed parity with the row engine for `sum` window functions. Release note: None
This commit modifies the count aggregate operator to implement the `slidingWindowAggregateFunc` interface so that it can be used with the sliding window optimization. Release note: None
…ction This commit modifies the `average` aggregate operator to implement the `slidingWindowAggregateFunc` interface so that it can be used with the sliding window optimization. Release note: None
This commit implements an optimized version of `count_rows` that calculates the size of the window frame as soon as the window frame is calculated. This means that most of the overhead for `count_rows` now comes from calculating the window frame, which is worst-case linear time (previously, the step to retrieve the size of the frame was quadratic, though with a small constant). Release note: None
This commit modifies the 'min' and 'max' aggregate window functions to implement the `slidingWindowAggregateFunc` interface, which allows them to be used in a sliding window context. However, this is only usable when the window frame never shrinks - e.g. it always contains all rows from the previous frame. This commit also provides implementations of `min` and `max` for use when the window frame can shrink. The indices of the 'next best' minimum or maximum values are stored in a priority queue that is updated for each row. Using the priority queue allows the `min` and `max` operators to avoid fully aggregating over the window frame even when the previous best value goes out of scope. Note that this implementation currently does not handle the case of non-default exclusion clause, in which case we must fall back to the quadratic approach. Release note: None
a088567 to
cd63a65
Compare
DrewKimball
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @DrewKimball and @yuzefovich)
pkg/sql/colexec/colexecwindow/window_framer_tmpl.go, line 540 at r19 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
super nit: the closing parenthesis probably should be after "ends first".
Done.
|
TFTR! |
|
The bazel failures are a timeout and a multiregion test with no window functions, so I think they're unrelated. |
|
bors r+ |
66889: jobs: retry jobs with exponential backoff r=ajwerner a=sajjadrizvi This commit adds a mechanism to retry jobs with exponentially increasing delays. This is achieved through two new columns in system.jobs table, last_run and num_runs. In addition, this commit adds cluster settings to control exponential-backoff parameters, initial delay and max delay, with corresponding settings `jobs.registry.retry.initial_delay` and `jobs.registry.retry.max_delay`. Finally, this commit adds a new partial-index in the jobs table that improves the performance of periodic queries run by registry in each node. Release note (general change): The behavior for retrying jobs, which fail due to a retriable error or due to job coordinator failure, is now delayed using exponential backoff. Before this change, jobs which failed in a retryable manner, would be resumed immediately on a different coordinator. This change reduces the impact of recurrently failing jobs on the cluster. This change adds two new cluster settings that control this behavior: "jobs.registry.retry.initial_delay" and "jobs.registry.retry.max_delay", which respectively control initial delay and maximum delay between resumptions. Fixes #44594 Fixes #65080 68212: colexec: add optimized versions of aggregate window functions r=DrewKimball a=DrewKimball **colexecwindow: add sliding window functionality to window framer** This commit adds a method `slidingWindowIntervals` to `windowFramer` operators that returns a set of `toAdd` intervals and a set of `toRemove` intervals, which indicate the rows that should be added to the current aggregation and those that should be removed, respectively. This will be used to implement the sliding window optimization for aggregate window functions such as `sum`. **colexecwindow: implement sliding window aggregator** This commit supplies a new operator, `slidingWindowAggregator`, which is used for any window aggregate functions that implement the `slidingWindowAggregateFunc` interface. Rather than aggregating over the entire window frame for each row, the `slidingWindowAggregator` operator aggregates over the rows that are in the current window frame but were not in the previous, and removes from the aggregation the rows that were in the previous window frame but not the current. This allows window aggregate functions to be evaluated in linear rather than quadratic time. **colexec: implement sliding window optimization for sum window function** This commit modifies the `sum` aggregate window function to implement the `slidingWindowAggregateFunc`, which allows it to be used in a sliding window context. This yields linear rather than quadratic scaling in the worst case, and allows the vectorized engine to meet or exceed parity with the row engine for `sum` window functions. **colexec: implement sliding window optimization for count window function** This commit modifies the count aggregate operator to implement the `slidingWindowAggregateFunc` interface so that it can be used with the sliding window optimization. **colexec: implement sliding window optimization for average window function** This commit modifies the `average` aggregate operator to implement the `slidingWindowAggregateFunc` interface so that it can be used with the sliding window optimization. **colexec: optimize count_rows window function** This commit implements an optimized version of `count_rows` that calculates the size of the window frame as soon as the window frame is calculated. This means that most of the overhead for `count_rows` now comes from calculating the window frame, which is worst-case linear time (previously, the step to retrieve the size of the frame was quadratic, though with a small constant). **colexec: optimize min and max window functions with default exclusion** This commit modifies the 'min' and 'max' aggregate window functions to implement the `slidingWindowAggregateFunc` interface, which allows them to be used in a sliding window context. However, this is only usable when the window frame never shrinks - e.g. it always contains all rows from the previous frame. This commit also provides implementations of `min` and `max` for use when the window frame can shrink. The indices of the 'next best' minimum or maximum values are stored in a priority queue that is updated for each row. Using the priority queue allows the `min` and `max` operators to avoid fully aggregating over the window frame even when the previous best value goes out of scope. Note that this implementation currently does not handle the case of non-default exclusion clause, in which case we must fall back to the quadratic approach. Fixes: #37039 Release note (performance improvement): The vectorized engine can now use the sliding-window approach to execute common aggregate functions as window functions. This allows aggregate window functions to be evaluated in linear rather than quadratic time. Currently, sum, count, average, min, and max are executed using this approach. 68433: sql: implemented placement restricted syntax for domiciling r=pawalt a=pawalt This PR combines the existing restricted placement zone config logic with the stubbed syntax to create an end-to-end PLACEMENT RESTRICTED implementation. Release note: None Note that the cluster setting for domiciling and telemetry will be added in a later PR. 68818: changefeedccl: mark avro format as no longer experimental r=[miretskiy,spiffyeng] a=HonoreDB The avro format for changefeeds now supports all column types and has been in production use for several releases. We'll now allow format=avro rather than format=experimental_avro The old string will remain supported because job payloads can persist across upgrades and downgrades. Release note (enterprise change): changefeed avro format no longer marked experimental Co-authored-by: Sajjad Rizvi <sajjad@cockroachlabs.com> Co-authored-by: Drew Kimball <drewk@cockroachlabs.com> Co-authored-by: Peyton Walters <peyton.walters@cockroachlabs.com> Co-authored-by: Aaron Zinger <zinger@cockroachlabs.com>
|
Build failed (retrying...): |
|
Build succeeded: |
colexecwindow: add sliding window functionality to window framer
This commit adds a method
slidingWindowIntervalstowindowFrameroperators that returns a set of
toAddintervals and a set oftoRemoveintervals, which indicate the rows that should be addedto the current aggregation and those that should be removed, respectively.
This will be used to implement the sliding window optimization for
aggregate window functions such as
sum.colexecwindow: implement sliding window aggregator
This commit supplies a new operator,
slidingWindowAggregator, whichis used for any window aggregate functions that implement the
slidingWindowAggregateFuncinterface. Rather than aggregating overthe entire window frame for each row, the
slidingWindowAggregatoroperator aggregates over the rows that are in the current window
frame but were not in the previous, and removes from the aggregation
the rows that were in the previous window frame but not the current.
This allows window aggregate functions to be evaluated in linear rather
than quadratic time.
colexec: implement sliding window optimization for sum window function
This commit modifies the
sumaggregate window function to implementthe
slidingWindowAggregateFunc, which allows it to be used in asliding window context. This yields linear rather than quadratic scaling
in the worst case, and allows the vectorized engine to meet or exceed
parity with the row engine for
sumwindow functions.colexec: implement sliding window optimization for count window function
This commit modifies the count aggregate operator to implement the
slidingWindowAggregateFuncinterface so that it can be used withthe sliding window optimization.
colexec: implement sliding window optimization for average window function
This commit modifies the
averageaggregate operator to implement theslidingWindowAggregateFuncinterface so that it can be used with thesliding window optimization.
colexec: optimize count_rows window function
This commit implements an optimized version of
count_rowsthatcalculates the size of the window frame as soon as the window frame
is calculated. This means that most of the overhead for
count_rowsnow comes from calculating the window frame, which is worst-case
linear time (previously, the step to retrieve the size of the frame
was quadratic, though with a small constant).
colexec: optimize min and max window functions with default exclusion
This commit modifies the 'min' and 'max' aggregate window functions
to implement the
slidingWindowAggregateFuncinterface, which allowsthem to be used in a sliding window context. However, this is only
usable when the window frame never shrinks - e.g. it always contains
all rows from the previous frame.
This commit also provides implementations of
minandmaxfor usewhen the window frame can shrink. The indices of the 'next best'
minimum or maximum values are stored in a priority queue that is
updated for each row. Using the priority queue allows the
minandmaxoperators to avoid fully aggregating over the window frameeven when the previous best value goes out of scope. Note that this
implementation currently does not handle the case of non-default
exclusion clause, in which case we must fall back to the quadratic
approach.
Fixes: #37039
Release note (performance improvement): The vectorized engine can now
use the sliding-window approach to execute common aggregate functions
as window functions. This allows aggregate window functions to be evaluated
in linear rather than quadratic time. Currently, sum, count, average, min, and
max are executed using this approach.