@@ -90,14 +90,53 @@ class TopNHeap {
9090 void InitializeScan (TopNScanState &state, bool exclude_offset);
9191 void Scan (TopNScanState &state, DataChunk &chunk);
9292
93+ void AddSmallHeap (DataChunk &input, Vector &sort_keys_vec, const string &boundary_val,
94+ const string_t &global_boundary_val);
95+ void AddLargeHeap (DataChunk &input, Vector &sort_keys_vec, const string &boundary_val,
96+ const string_t &global_boundary_val);
97+
9398public:
9499 idx_t ReduceThreshold () const {
95100 return MaxValue<idx_t >(STANDARD_VECTOR_SIZE * 5ULL , 2ULL * heap_size);
96101 }
97102
98- idx_t HeapAllocSize () const {
103+ idx_t InitialHeapAllocSize () const {
99104 return MinValue<idx_t >(STANDARD_VECTOR_SIZE * 100ULL , ReduceThreshold ()) + STANDARD_VECTOR_SIZE;
100105 }
106+
107+ private:
108+ inline bool EntryShouldBeAdded (const string_t &sort_key) {
109+ if (heap.size () < heap_size) {
110+ // heap is full - check the latest entry
111+ return true ;
112+ }
113+ if (sort_key < heap.front ().sort_key ) {
114+ // sort key is smaller than current max value
115+ return true ;
116+ }
117+ // heap is full and there is no room for the entry
118+ return false ;
119+ }
120+
121+ inline bool EntryShouldBeAdded (const string_t &sort_key, const string &boundary_val,
122+ const string_t &global_boundary_val) {
123+ // first compare against the global boundary value (if there is any)
124+ if (!boundary_val.empty () && sort_key > global_boundary_val) {
125+ // this entry is out-of-range for the global boundary val
126+ // it will never be in the final result even if it fits in this heap
127+ return false ;
128+ }
129+ return EntryShouldBeAdded (sort_key);
130+ }
131+
132+ inline void AddEntryToHeap (const TopNEntry &entry) {
133+ if (heap.size () >= heap_size) {
134+ std::pop_heap (heap.begin (), heap.end ());
135+ heap.pop_back ();
136+ }
137+ heap.push_back (entry);
138+ std::push_heap (heap.begin (), heap.end ());
139+ }
101140};
102141
103142// ===--------------------------------------------------------------------===//
@@ -115,9 +154,10 @@ TopNHeap::TopNHeap(ClientContext &context, Allocator &allocator, const vector<Lo
115154 sort_types.push_back (expr->return_type );
116155 executor.AddExpression (*expr);
117156 }
157+ heap.reserve (InitialHeapAllocSize ());
118158 vector<LogicalType> sort_keys_type {LogicalType::BLOB};
119159 sort_keys.Initialize (allocator, sort_keys_type);
120- heap_data.Initialize (allocator, payload_types, HeapAllocSize ());
160+ heap_data.Initialize (allocator, payload_types, InitialHeapAllocSize ());
121161 payload_chunk.Initialize (allocator, payload_types);
122162 sort_chunk.Initialize (allocator, sort_types);
123163}
@@ -132,66 +172,29 @@ TopNHeap::TopNHeap(ExecutionContext &context, const vector<LogicalType> &payload
132172 : TopNHeap(context.client, Allocator::Get(context.client), payload_types, orders, limit, offset) {
133173}
134174
135- void TopNHeap::Sink (DataChunk &input, optional_ptr<TopNBoundaryValue> global_boundary) {
136- // compute the ordering values for the new chunk
137- sort_chunk.Reset ();
138- executor.Execute (input, sort_chunk);
139-
140- // construct the sort key from the sort chunk
141- vector<OrderModifiers> modifiers;
142- for (auto &order : orders) {
143- modifiers.emplace_back (order.type , order.null_order );
144- }
145- sort_keys.Reset ();
146- auto &sort_keys_vec = sort_keys.data [0 ];
147- CreateSortKeyHelpers::CreateSortKey (sort_chunk, modifiers, sort_keys_vec);
148-
149- // fetch the current global boundary (if any)
150- string boundary_val;
151- string_t global_boundary_val;
152- if (global_boundary) {
153- boundary_val = global_boundary->GetBoundaryValue ();
154- global_boundary_val = string_t (boundary_val);
155- }
156-
175+ void TopNHeap::AddSmallHeap (DataChunk &input, Vector &sort_keys_vec, const string &boundary_val,
176+ const string_t &global_boundary_val) {
157177 // insert the sort keys into the priority queue
158178 constexpr idx_t BASE_INDEX = NumericLimits<uint32_t >::Maximum ();
159179
160180 bool any_added = false ;
161181 auto sort_key_values = FlatVector::GetData<string_t >(sort_keys_vec);
162182 for (idx_t r = 0 ; r < input.size (); r++) {
163183 auto &sort_key = sort_key_values[r];
164- if (!boundary_val. empty () && sort_key > global_boundary_val) {
184+ if (!EntryShouldBeAdded ( sort_key, boundary_val, global_boundary_val) ) {
165185 continue ;
166186 }
167- if (heap.size () >= heap_size) {
168- // heap is full - check the latest entry
169- if (sort_key > heap.front ().sort_key ) {
170- // current max in the heap is smaller than the new key - skip this entry
171- continue ;
172- }
173- }
174187 // replace the previous top entry with the new entry
175188 TopNEntry entry;
176189 entry.sort_key = sort_key;
177190 entry.index = BASE_INDEX + r;
178- if (heap.size () >= heap_size) {
179- std::pop_heap (heap.begin (), heap.end ());
180- heap.pop_back ();
181- }
182- heap.push_back (entry);
183- std::push_heap (heap.begin (), heap.end ());
191+ AddEntryToHeap (entry);
184192 any_added = true ;
185193 }
186194 if (!any_added) {
187195 // early-out: no matches
188196 return ;
189197 }
190- // if we modified the heap we might be able to update the global boundary
191- // note that the global boundary only applies to FULL heaps
192- if (heap.size () >= heap_size && global_boundary) {
193- global_boundary->UpdateValue (heap.front ().sort_key );
194- }
195198
196199 // for all matching entries we need to copy over the corresponding payload values
197200 idx_t match_count = 0 ;
@@ -214,20 +217,98 @@ void TopNHeap::Sink(DataChunk &input, optional_ptr<TopNBoundaryValue> global_bou
214217 heap_data.Append (input, true , &matching_sel, match_count);
215218}
216219
220+ void TopNHeap::AddLargeHeap (DataChunk &input, Vector &sort_keys_vec, const string &boundary_val,
221+ const string_t &global_boundary_val) {
222+ auto sort_key_values = FlatVector::GetData<string_t >(sort_keys_vec);
223+ idx_t base_index = heap_data.size ();
224+ idx_t match_count = 0 ;
225+ for (idx_t r = 0 ; r < input.size (); r++) {
226+ auto &sort_key = sort_key_values[r];
227+ if (!EntryShouldBeAdded (sort_key, boundary_val, global_boundary_val)) {
228+ continue ;
229+ }
230+ // replace the previous top entry with the new entry
231+ TopNEntry entry;
232+ entry.sort_key = sort_key.IsInlined () ? sort_key : sort_key_heap.AddBlob (sort_key);
233+ entry.index = base_index + match_count;
234+ AddEntryToHeap (entry);
235+ matching_sel.set_index (match_count++, r);
236+ }
237+ if (match_count == 0 ) {
238+ // early-out: no matches
239+ return ;
240+ }
241+
242+ // copy over the input rows to the payload chunk
243+ heap_data.Append (input, true , &matching_sel, match_count);
244+ }
245+
246+ void TopNHeap::Sink (DataChunk &input, optional_ptr<TopNBoundaryValue> global_boundary) {
247+ static constexpr idx_t SMALL_HEAP_THRESHOLD = 100 ;
248+
249+ // compute the ordering values for the new chunk
250+ sort_chunk.Reset ();
251+ executor.Execute (input, sort_chunk);
252+
253+ // construct the sort key from the sort chunk
254+ vector<OrderModifiers> modifiers;
255+ for (auto &order : orders) {
256+ modifiers.emplace_back (order.type , order.null_order );
257+ }
258+ sort_keys.Reset ();
259+ auto &sort_keys_vec = sort_keys.data [0 ];
260+ CreateSortKeyHelpers::CreateSortKey (sort_chunk, modifiers, sort_keys_vec);
261+
262+ // fetch the current global boundary (if any)
263+ string boundary_val;
264+ string_t global_boundary_val;
265+ if (global_boundary) {
266+ boundary_val = global_boundary->GetBoundaryValue ();
267+ global_boundary_val = string_t (boundary_val);
268+ }
269+
270+ if (heap_size <= SMALL_HEAP_THRESHOLD) {
271+ AddSmallHeap (input, sort_keys_vec, boundary_val, global_boundary_val);
272+ } else {
273+ AddLargeHeap (input, sort_keys_vec, boundary_val, global_boundary_val);
274+ }
275+
276+ // if we modified the heap we might be able to update the global boundary
277+ // note that the global boundary only applies to FULL heaps
278+ if (heap.size () >= heap_size && global_boundary) {
279+ global_boundary->UpdateValue (heap.front ().sort_key );
280+ }
281+ }
282+
217283void TopNHeap::Combine (TopNHeap &other) {
218284 other.Finalize ();
219285
220- // FIXME: heaps can be merged directly instead of doing it like this
221- // that only really speeds things up if heaps are very large, however
222- TopNScanState state;
223- other.InitializeScan (state, false );
224- while (true ) {
225- payload_chunk.Reset ();
226- other.Scan (state, payload_chunk);
227- if (payload_chunk.size () == 0 ) {
228- break ;
286+ idx_t match_count = 0 ;
287+ // merge the heap of other into this
288+ for (idx_t i = 0 ; i < other.heap .size (); i++) {
289+ // heap is full - check the latest entry
290+ auto &other_entry = other.heap [i];
291+ auto &sort_key = other_entry.sort_key ;
292+ if (!EntryShouldBeAdded (sort_key)) {
293+ continue ;
294+ }
295+ // add this entry
296+ TopNEntry new_entry;
297+ new_entry.sort_key = sort_key.IsInlined () ? sort_key : sort_key_heap.AddBlob (sort_key);
298+ new_entry.index = heap_data.size () + match_count;
299+ AddEntryToHeap (new_entry);
300+
301+ matching_sel.set_index (match_count++, other_entry.index );
302+ if (match_count >= STANDARD_VECTOR_SIZE) {
303+ // flush
304+ heap_data.Append (other.heap_data , true , &matching_sel, match_count);
305+ match_count = 0 ;
229306 }
230- Sink (payload_chunk);
307+ }
308+ if (match_count > 0 ) {
309+ // flush
310+ heap_data.Append (other.heap_data , true , &matching_sel, match_count);
311+ match_count = 0 ;
231312 }
232313 Reduce ();
233314}
@@ -236,14 +317,14 @@ void TopNHeap::Finalize() {
236317}
237318
238319void TopNHeap::Reduce () {
239- if (payload_chunk .size () < ReduceThreshold ()) {
320+ if (heap_data .size () < ReduceThreshold ()) {
240321 // only reduce when we pass the reduce threshold
241322 return ;
242323 }
243324 // we have too many values in the heap - reduce them
244325 StringHeap new_sort_heap;
245- DataChunk new_payload_chunk ;
246- new_payload_chunk .Initialize (allocator, payload_types, HeapAllocSize ());
326+ DataChunk new_heap_data ;
327+ new_heap_data .Initialize (allocator, payload_types, heap. size ());
247328
248329 SelectionVector new_payload_sel (heap.size ());
249330 for (idx_t i = 0 ; i < heap.size (); i++) {
@@ -258,21 +339,22 @@ void TopNHeap::Reduce() {
258339 }
259340
260341 // copy over the data from the current payload chunk to the new payload chunk
261- payload_chunk.Copy (new_payload_chunk, new_payload_sel, heap.size ());
342+ new_heap_data.Slice (heap_data, new_payload_sel, heap.size ());
343+ new_heap_data.Flatten ();
262344
263- new_sort_heap .Move (sort_key_heap );
264- payload_chunk .Reference (new_payload_chunk );
345+ sort_key_heap .Move (new_sort_heap );
346+ heap_data .Reference (new_heap_data );
265347}
266348
267349void TopNHeap::InitializeScan (TopNScanState &state, bool exclude_offset) {
268350 auto heap_copy = heap;
269351 // traverse the rest of the heap
352+ state.scan_order .resize (heap_copy.size ());
270353 while (!heap_copy.empty ()) {
271354 std::pop_heap (heap_copy.begin (), heap_copy.end ());
272- state.scan_order . push_back ( UnsafeNumericCast<sel_t >(heap_copy.back ().index ) );
355+ state.scan_order [heap_copy. size () - 1 ] = UnsafeNumericCast<sel_t >(heap_copy.back ().index );
273356 heap_copy.pop_back ();
274357 }
275- std::reverse (state.scan_order .begin (), state.scan_order .end ());
276358 state.pos = exclude_offset ? offset : 0 ;
277359}
278360
0 commit comments