Skip to content

Commit 8e2c944

Browse files
authored
Top-N: Improve performance with large heaps, and correctly call Reduce (#14900)
Follow-up from #14424 Fixes #14896 #### Large Heaps When using a large `offset`, the heap we generate in the Top-N operator is large. The previous implementation took some shortcuts that made it work poorly with large heaps, resulting in performance degradations compared to the previous implementation: * `TopNHeap::Combine` would scan and re-insert the values in the heaps, instead of directly merging heaps * `TopNHeap::Sink` would fully scan the heap at every iteration This PR fixes these issues. ### `AddSmallHeap`/`AddLargeHeap` The previous heap insertion happened in two stages: * Insert the sort keys one-by-one into the heap * Scan the heap to figure out which **final sort keys** still remain inside the heap * Append the payload for the corresponding final sort keys The main reason for this two-stage approach is to deal with many conflicts *within the same data chunk*. For example, consider the query: ```sql SELECT * FROM lineitem ORDER BY l_orderkey DESC LIMIT 5; ``` Since `lineitem` is sorted by `l_orderkey`, every chunk we stream into the operator will be full of the new highest values. Without this two-stage approach, we will append all rows to the payload data, only to then discard most rows again. Using the two stage approach we append at most 5 rows (the heap size) to the payload at each iteration, leading to lower memory usage and fewer calls to Reduce. ### AddLargeHeap This PR adds a new, simpler, single-stage insertion where we insert the sort keys and immediately insert payload data. We switch to this approach for heaps `>= 100 rows`. By immediately inserting the payload data we don't need to scan the heap during the sink phase, which has great speed-ups for when we are dealing with larger heaps. ### Benchmarks Below is an example of a Top-N with a large heap that is sped up significantly by this approach: ```sql select l_orderkey from lineitem where l_linestatus = 'O' order by l_quantity limit 100 offset 1000000; ``` | v1.1 | main | new | |-------|------|-------| | 0.86s | 6.9s | 0.77s |
2 parents fbbfc4a + ae85909 commit 8e2c944

3 files changed

Lines changed: 223 additions & 63 deletions

File tree

src/execution/operator/order/physical_top_n.cpp

Lines changed: 143 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,53 @@ class TopNHeap {
9090
void InitializeScan(TopNScanState &state, bool exclude_offset);
9191
void Scan(TopNScanState &state, DataChunk &chunk);
9292

93+
void AddSmallHeap(DataChunk &input, Vector &sort_keys_vec, const string &boundary_val,
94+
const string_t &global_boundary_val);
95+
void AddLargeHeap(DataChunk &input, Vector &sort_keys_vec, const string &boundary_val,
96+
const string_t &global_boundary_val);
97+
9398
public:
9499
idx_t ReduceThreshold() const {
95100
return MaxValue<idx_t>(STANDARD_VECTOR_SIZE * 5ULL, 2ULL * heap_size);
96101
}
97102

98-
idx_t HeapAllocSize() const {
103+
idx_t InitialHeapAllocSize() const {
99104
return MinValue<idx_t>(STANDARD_VECTOR_SIZE * 100ULL, ReduceThreshold()) + STANDARD_VECTOR_SIZE;
100105
}
106+
107+
private:
108+
inline bool EntryShouldBeAdded(const string_t &sort_key) {
109+
if (heap.size() < heap_size) {
110+
// heap is full - check the latest entry
111+
return true;
112+
}
113+
if (sort_key < heap.front().sort_key) {
114+
// sort key is smaller than current max value
115+
return true;
116+
}
117+
// heap is full and there is no room for the entry
118+
return false;
119+
}
120+
121+
inline bool EntryShouldBeAdded(const string_t &sort_key, const string &boundary_val,
122+
const string_t &global_boundary_val) {
123+
// first compare against the global boundary value (if there is any)
124+
if (!boundary_val.empty() && sort_key > global_boundary_val) {
125+
// this entry is out-of-range for the global boundary val
126+
// it will never be in the final result even if it fits in this heap
127+
return false;
128+
}
129+
return EntryShouldBeAdded(sort_key);
130+
}
131+
132+
inline void AddEntryToHeap(const TopNEntry &entry) {
133+
if (heap.size() >= heap_size) {
134+
std::pop_heap(heap.begin(), heap.end());
135+
heap.pop_back();
136+
}
137+
heap.push_back(entry);
138+
std::push_heap(heap.begin(), heap.end());
139+
}
101140
};
102141

103142
//===--------------------------------------------------------------------===//
@@ -115,9 +154,10 @@ TopNHeap::TopNHeap(ClientContext &context, Allocator &allocator, const vector<Lo
115154
sort_types.push_back(expr->return_type);
116155
executor.AddExpression(*expr);
117156
}
157+
heap.reserve(InitialHeapAllocSize());
118158
vector<LogicalType> sort_keys_type {LogicalType::BLOB};
119159
sort_keys.Initialize(allocator, sort_keys_type);
120-
heap_data.Initialize(allocator, payload_types, HeapAllocSize());
160+
heap_data.Initialize(allocator, payload_types, InitialHeapAllocSize());
121161
payload_chunk.Initialize(allocator, payload_types);
122162
sort_chunk.Initialize(allocator, sort_types);
123163
}
@@ -132,66 +172,29 @@ TopNHeap::TopNHeap(ExecutionContext &context, const vector<LogicalType> &payload
132172
: TopNHeap(context.client, Allocator::Get(context.client), payload_types, orders, limit, offset) {
133173
}
134174

135-
void TopNHeap::Sink(DataChunk &input, optional_ptr<TopNBoundaryValue> global_boundary) {
136-
// compute the ordering values for the new chunk
137-
sort_chunk.Reset();
138-
executor.Execute(input, sort_chunk);
139-
140-
// construct the sort key from the sort chunk
141-
vector<OrderModifiers> modifiers;
142-
for (auto &order : orders) {
143-
modifiers.emplace_back(order.type, order.null_order);
144-
}
145-
sort_keys.Reset();
146-
auto &sort_keys_vec = sort_keys.data[0];
147-
CreateSortKeyHelpers::CreateSortKey(sort_chunk, modifiers, sort_keys_vec);
148-
149-
// fetch the current global boundary (if any)
150-
string boundary_val;
151-
string_t global_boundary_val;
152-
if (global_boundary) {
153-
boundary_val = global_boundary->GetBoundaryValue();
154-
global_boundary_val = string_t(boundary_val);
155-
}
156-
175+
void TopNHeap::AddSmallHeap(DataChunk &input, Vector &sort_keys_vec, const string &boundary_val,
176+
const string_t &global_boundary_val) {
157177
// insert the sort keys into the priority queue
158178
constexpr idx_t BASE_INDEX = NumericLimits<uint32_t>::Maximum();
159179

160180
bool any_added = false;
161181
auto sort_key_values = FlatVector::GetData<string_t>(sort_keys_vec);
162182
for (idx_t r = 0; r < input.size(); r++) {
163183
auto &sort_key = sort_key_values[r];
164-
if (!boundary_val.empty() && sort_key > global_boundary_val) {
184+
if (!EntryShouldBeAdded(sort_key, boundary_val, global_boundary_val)) {
165185
continue;
166186
}
167-
if (heap.size() >= heap_size) {
168-
// heap is full - check the latest entry
169-
if (sort_key > heap.front().sort_key) {
170-
// current max in the heap is smaller than the new key - skip this entry
171-
continue;
172-
}
173-
}
174187
// replace the previous top entry with the new entry
175188
TopNEntry entry;
176189
entry.sort_key = sort_key;
177190
entry.index = BASE_INDEX + r;
178-
if (heap.size() >= heap_size) {
179-
std::pop_heap(heap.begin(), heap.end());
180-
heap.pop_back();
181-
}
182-
heap.push_back(entry);
183-
std::push_heap(heap.begin(), heap.end());
191+
AddEntryToHeap(entry);
184192
any_added = true;
185193
}
186194
if (!any_added) {
187195
// early-out: no matches
188196
return;
189197
}
190-
// if we modified the heap we might be able to update the global boundary
191-
// note that the global boundary only applies to FULL heaps
192-
if (heap.size() >= heap_size && global_boundary) {
193-
global_boundary->UpdateValue(heap.front().sort_key);
194-
}
195198

196199
// for all matching entries we need to copy over the corresponding payload values
197200
idx_t match_count = 0;
@@ -214,20 +217,98 @@ void TopNHeap::Sink(DataChunk &input, optional_ptr<TopNBoundaryValue> global_bou
214217
heap_data.Append(input, true, &matching_sel, match_count);
215218
}
216219

220+
void TopNHeap::AddLargeHeap(DataChunk &input, Vector &sort_keys_vec, const string &boundary_val,
221+
const string_t &global_boundary_val) {
222+
auto sort_key_values = FlatVector::GetData<string_t>(sort_keys_vec);
223+
idx_t base_index = heap_data.size();
224+
idx_t match_count = 0;
225+
for (idx_t r = 0; r < input.size(); r++) {
226+
auto &sort_key = sort_key_values[r];
227+
if (!EntryShouldBeAdded(sort_key, boundary_val, global_boundary_val)) {
228+
continue;
229+
}
230+
// replace the previous top entry with the new entry
231+
TopNEntry entry;
232+
entry.sort_key = sort_key.IsInlined() ? sort_key : sort_key_heap.AddBlob(sort_key);
233+
entry.index = base_index + match_count;
234+
AddEntryToHeap(entry);
235+
matching_sel.set_index(match_count++, r);
236+
}
237+
if (match_count == 0) {
238+
// early-out: no matches
239+
return;
240+
}
241+
242+
// copy over the input rows to the payload chunk
243+
heap_data.Append(input, true, &matching_sel, match_count);
244+
}
245+
246+
void TopNHeap::Sink(DataChunk &input, optional_ptr<TopNBoundaryValue> global_boundary) {
247+
static constexpr idx_t SMALL_HEAP_THRESHOLD = 100;
248+
249+
// compute the ordering values for the new chunk
250+
sort_chunk.Reset();
251+
executor.Execute(input, sort_chunk);
252+
253+
// construct the sort key from the sort chunk
254+
vector<OrderModifiers> modifiers;
255+
for (auto &order : orders) {
256+
modifiers.emplace_back(order.type, order.null_order);
257+
}
258+
sort_keys.Reset();
259+
auto &sort_keys_vec = sort_keys.data[0];
260+
CreateSortKeyHelpers::CreateSortKey(sort_chunk, modifiers, sort_keys_vec);
261+
262+
// fetch the current global boundary (if any)
263+
string boundary_val;
264+
string_t global_boundary_val;
265+
if (global_boundary) {
266+
boundary_val = global_boundary->GetBoundaryValue();
267+
global_boundary_val = string_t(boundary_val);
268+
}
269+
270+
if (heap_size <= SMALL_HEAP_THRESHOLD) {
271+
AddSmallHeap(input, sort_keys_vec, boundary_val, global_boundary_val);
272+
} else {
273+
AddLargeHeap(input, sort_keys_vec, boundary_val, global_boundary_val);
274+
}
275+
276+
// if we modified the heap we might be able to update the global boundary
277+
// note that the global boundary only applies to FULL heaps
278+
if (heap.size() >= heap_size && global_boundary) {
279+
global_boundary->UpdateValue(heap.front().sort_key);
280+
}
281+
}
282+
217283
void TopNHeap::Combine(TopNHeap &other) {
218284
other.Finalize();
219285

220-
// FIXME: heaps can be merged directly instead of doing it like this
221-
// that only really speeds things up if heaps are very large, however
222-
TopNScanState state;
223-
other.InitializeScan(state, false);
224-
while (true) {
225-
payload_chunk.Reset();
226-
other.Scan(state, payload_chunk);
227-
if (payload_chunk.size() == 0) {
228-
break;
286+
idx_t match_count = 0;
287+
// merge the heap of other into this
288+
for (idx_t i = 0; i < other.heap.size(); i++) {
289+
// heap is full - check the latest entry
290+
auto &other_entry = other.heap[i];
291+
auto &sort_key = other_entry.sort_key;
292+
if (!EntryShouldBeAdded(sort_key)) {
293+
continue;
294+
}
295+
// add this entry
296+
TopNEntry new_entry;
297+
new_entry.sort_key = sort_key.IsInlined() ? sort_key : sort_key_heap.AddBlob(sort_key);
298+
new_entry.index = heap_data.size() + match_count;
299+
AddEntryToHeap(new_entry);
300+
301+
matching_sel.set_index(match_count++, other_entry.index);
302+
if (match_count >= STANDARD_VECTOR_SIZE) {
303+
// flush
304+
heap_data.Append(other.heap_data, true, &matching_sel, match_count);
305+
match_count = 0;
229306
}
230-
Sink(payload_chunk);
307+
}
308+
if (match_count > 0) {
309+
// flush
310+
heap_data.Append(other.heap_data, true, &matching_sel, match_count);
311+
match_count = 0;
231312
}
232313
Reduce();
233314
}
@@ -236,14 +317,14 @@ void TopNHeap::Finalize() {
236317
}
237318

238319
void TopNHeap::Reduce() {
239-
if (payload_chunk.size() < ReduceThreshold()) {
320+
if (heap_data.size() < ReduceThreshold()) {
240321
// only reduce when we pass the reduce threshold
241322
return;
242323
}
243324
// we have too many values in the heap - reduce them
244325
StringHeap new_sort_heap;
245-
DataChunk new_payload_chunk;
246-
new_payload_chunk.Initialize(allocator, payload_types, HeapAllocSize());
326+
DataChunk new_heap_data;
327+
new_heap_data.Initialize(allocator, payload_types, heap.size());
247328

248329
SelectionVector new_payload_sel(heap.size());
249330
for (idx_t i = 0; i < heap.size(); i++) {
@@ -258,21 +339,22 @@ void TopNHeap::Reduce() {
258339
}
259340

260341
// copy over the data from the current payload chunk to the new payload chunk
261-
payload_chunk.Copy(new_payload_chunk, new_payload_sel, heap.size());
342+
new_heap_data.Slice(heap_data, new_payload_sel, heap.size());
343+
new_heap_data.Flatten();
262344

263-
new_sort_heap.Move(sort_key_heap);
264-
payload_chunk.Reference(new_payload_chunk);
345+
sort_key_heap.Move(new_sort_heap);
346+
heap_data.Reference(new_heap_data);
265347
}
266348

267349
void TopNHeap::InitializeScan(TopNScanState &state, bool exclude_offset) {
268350
auto heap_copy = heap;
269351
// traverse the rest of the heap
352+
state.scan_order.resize(heap_copy.size());
270353
while (!heap_copy.empty()) {
271354
std::pop_heap(heap_copy.begin(), heap_copy.end());
272-
state.scan_order.push_back(UnsafeNumericCast<sel_t>(heap_copy.back().index));
355+
state.scan_order[heap_copy.size() - 1] = UnsafeNumericCast<sel_t>(heap_copy.back().index);
273356
heap_copy.pop_back();
274357
}
275-
std::reverse(state.scan_order.begin(), state.scan_order.end());
276358
state.pos = exclude_offset ? offset : 0;
277359
}
278360

test/optimizer/compressed_materialization.test_coverage renamed to test/optimizer/compressed_materialization.test_slow

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# name: test/optimizer/compressed_materialization.test_coverage
1+
# name: test/optimizer/compressed_materialization.test_slow
22
# description: Compressed materialization test
33
# group: [optimizer]
44

@@ -18,7 +18,7 @@ Binder Error: Compressed materialization functions are for internal use only!
1818
statement ok
1919
create table t0 as select range%400000 a, range%400000 b from range(500000);
2020

21-
query III
21+
query III rowsort
2222
select * from (
2323
select *, row_number() OVER () as row_number from (
2424
SELECT * FROM t0 ORDER BY 1) ta

test/sql/topn/tpch_top_n.test_slow

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# name: test/sql/topn/tpch_top_n.test_slow
2+
# description: Test Top N NULLS FIRST/LAST with few rows
3+
# group: [topn]
4+
5+
require tpch
6+
7+
statement ok
8+
CALL dbgen(sf=1);
9+
10+
query I
11+
select l_quantity
12+
from lineitem
13+
where l_linestatus = 'O'
14+
order by l_quantity limit 10 offset 100;
15+
----
16+
1.00
17+
1.00
18+
1.00
19+
1.00
20+
1.00
21+
1.00
22+
1.00
23+
1.00
24+
1.00
25+
1.00
26+
27+
query I
28+
select l_quantity
29+
from lineitem
30+
where l_linestatus = 'O'
31+
order by l_quantity limit 10 offset 1000000;
32+
----
33+
17.00
34+
17.00
35+
17.00
36+
17.00
37+
17.00
38+
17.00
39+
17.00
40+
17.00
41+
17.00
42+
17.00
43+
44+
query I
45+
select sum(l_quantity)
46+
from lineitem
47+
group by l_orderkey
48+
order by sum(l_quantity) desc
49+
limit 10 offset 100;
50+
----
51+
297.00
52+
296.00
53+
296.00
54+
296.00
55+
296.00
56+
296.00
57+
296.00
58+
296.00
59+
295.00
60+
295.00
61+
62+
query I
63+
select sum(l_quantity)
64+
from lineitem
65+
group by l_orderkey
66+
order by sum(l_quantity) desc
67+
limit 10 offset 100000;
68+
----
69+
195.00
70+
195.00
71+
195.00
72+
195.00
73+
195.00
74+
195.00
75+
195.00
76+
195.00
77+
195.00
78+
195.00

0 commit comments

Comments
 (0)