Skip to content

Commit 8335bbc

Browse files
committed
fix data race on downscaling
1 parent 523e11e commit 8335bbc

1 file changed

Lines changed: 5 additions & 6 deletions

File tree

src/Processors/Executors/ExecutorTasks.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,19 +290,18 @@ void ExecutorTasks::preempt(size_t slot_id)
290290
std::unique_lock lock(mutex);
291291
--total_slots;
292292

293-
// We should make sure that preempted thread has no local task inside context.
294-
// It is allowed to have tasks in `task_queue` or `fast_task_queue` because they can be stealed by other threads.
293+
/// We should make sure that preempted thread has no local task inside context.
294+
/// It is allowed to have tasks in `task_queue` or `fast_task_queue` because they can be stealed by other threads.
295295
auto & context = executor_contexts[slot_id];
296296
if (auto * task = context->popTask())
297297
{
298298
task_queue.push(task, slot_id);
299299
/// Wake up at least one thread to avoid deadlocks (all other threads maybe idle)
300-
tryWakeUpAnyOtherThreadWithTasks(*context, lock);
300+
tryWakeUpAnyOtherThreadWithTasks(*context, lock); // this releases the lock if it wakes up a thread
301301
}
302-
303-
// Finish pipeline if preempted thread was the last non-idle thread executed the last task of the whole pipeline
304-
if (task_queue.empty() && fast_task_queue.empty() && async_task_queue.empty() && threads_queue.size() == total_slots)
302+
else if (task_queue.empty() && fast_task_queue.empty() && async_task_queue.empty() && threads_queue.size() == total_slots)
305303
{
304+
/// Finish pipeline if preempted thread was the last non-idle thread executed the last task of the whole pipeline
306305
lock.unlock();
307306
finish();
308307
}

0 commit comments

Comments
 (0)