Skip to content

Commit 510b155

Browse files
Attempt to implemnt global thread pool #4018
1 parent e4fce82 commit 510b155

4 files changed

Lines changed: 103 additions & 39 deletions

File tree

dbms/src/Databases/IDatabase.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,10 @@
88
#include <functional>
99
#include <Poco/File.h>
1010
#include <Common/escapeForFileName.h>
11+
#include <common/ThreadPool.h>
1112
#include <Interpreters/Context.h>
1213

1314

14-
class ThreadPool;
15-
16-
1715
namespace DB
1816
{
1917

dbms/src/Interpreters/InterpreterCreateQuery.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
#include <Interpreters/IInterpreter.h>
44
#include <Storages/ColumnsDescription.h>
5+
#include <common/ThreadPool.h>
56

67

7-
class ThreadPool;
8-
98
namespace DB
109
{
10+
1111
class Context;
1212
class ASTCreateQuery;
1313
class ASTExpressionList;

libs/libcommon/include/common/ThreadPool.h

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,30 @@
77
#include <functional>
88
#include <queue>
99
#include <vector>
10+
#include <ext/singleton.h>
1011

1112

1213
/** Very simple thread pool similar to boost::threadpool.
1314
* Advantages:
1415
* - catches exceptions and rethrows on wait.
1516
*/
1617

17-
class ThreadPool
18+
template <typename Thread>
19+
class ThreadPoolImpl
1820
{
1921
public:
2022
using Job = std::function<void()>;
2123

22-
/// Size is constant, all threads are created immediately.
23-
explicit ThreadPool(size_t m_size);
24+
/// Size is constant. Up to num_threads are created on demand and then run until shutdown.
25+
explicit ThreadPoolImpl(size_t num_threads);
2426

25-
/// Add new job. Locks until free thread in pool become available or exception in one of threads was thrown.
27+
/// queue_size - maximum number of running plus scheduled jobs. It can be greater than num_threads. Zero means unlimited.
28+
ThreadPoolImpl(size_t num_threads, size_t queue_size);
29+
30+
/// Add new job. Locks until number of active jobs is less than maximum or exception in one of threads was thrown.
2631
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.
27-
void schedule(Job job);
32+
/// Priority: greater is higher.
33+
void schedule(Job job, int priority = 0);
2834

2935
/// Wait for all currently active jobs to be done.
3036
/// You may call schedule and wait many times in arbitary order.
@@ -34,24 +40,40 @@ class ThreadPool
3440

3541
/// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions).
3642
/// You should not destroy object while calling schedule or wait methods from another threads.
37-
~ThreadPool();
43+
~ThreadPoolImpl();
3844

39-
size_t size() const { return m_size; }
45+
size_t size() const { return num_threads; }
4046

41-
/// Returns number of active jobs.
47+
/// Returns number of running and scheduled jobs.
4248
size_t active() const;
4349

4450
private:
4551
mutable std::mutex mutex;
46-
std::condition_variable has_free_thread;
47-
std::condition_variable has_new_job_or_shutdown;
52+
std::condition_variable job_finished;
53+
std::condition_variable new_job_or_shutdown;
54+
55+
const size_t num_threads;
56+
const size_t queue_size;
4857

49-
const size_t m_size;
5058
size_t active_jobs = 0;
5159
bool shutdown = false;
5260

53-
std::queue<Job> jobs;
54-
std::vector<std::thread> threads;
61+
struct JobWithPriority
62+
{
63+
Job job;
64+
int priority;
65+
66+
JobWithPriority(Job job, int priority)
67+
: job(job), priority(priority) {}
68+
69+
bool operator< (const JobWithPriority & rhs) const
70+
{
71+
return priority < rhs.priority;
72+
}
73+
};
74+
75+
std::priority_queue<JobWithPriority> jobs;
76+
std::vector<Thread> threads;
5577
std::exception_ptr first_exception;
5678

5779

@@ -61,6 +83,31 @@ class ThreadPool
6183
};
6284

6385

86+
using FreeThreadPool = ThreadPoolImpl<std::thread>;
87+
88+
class GlobalThreadPool : public FreeThreadPool, public ext::singleton<GlobalThreadPool>
89+
{
90+
public:
91+
GlobalThreadPool() : FreeThreadPool(10000) {} /// TODO: global blocking limit may lead to deadlocks.
92+
};
93+
94+
class ThreadFromGlobalPool
95+
{
96+
public:
97+
ThreadFromGlobalPool(std::function<void()> func)
98+
{
99+
GlobalThreadPool::instance().schedule(func);
100+
}
101+
102+
void join()
103+
{
104+
/// noop, the std::thread will continue to run inside global pool.
105+
}
106+
};
107+
108+
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPool>;
109+
110+
64111
/// Allows to save first catched exception in jobs and postpone its rethrow.
65112
class ExceptionHandler
66113
{

libs/libcommon/src/ThreadPool.cpp

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,21 @@
22
#include <iostream>
33

44

5-
ThreadPool::ThreadPool(size_t m_size)
6-
: m_size(m_size)
5+
template <typename Thread>
6+
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t num_threads)
7+
: ThreadPoolImpl(num_threads, num_threads)
78
{
8-
threads.reserve(m_size);
9+
}
10+
11+
template <typename Thread>
12+
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t num_threads, size_t queue_size)
13+
: num_threads(num_threads), queue_size(queue_size)
14+
{
15+
threads.reserve(num_threads);
916

1017
try
1118
{
12-
for (size_t i = 0; i < m_size; ++i)
19+
for (size_t i = 0; i < num_threads; ++i)
1320
threads.emplace_back([this] { worker(); });
1421
}
1522
catch (...)
@@ -19,25 +26,30 @@ ThreadPool::ThreadPool(size_t m_size)
1926
}
2027
}
2128

22-
void ThreadPool::schedule(Job job)
29+
template <typename Thread>
30+
void ThreadPoolImpl<Thread>::schedule(Job job, int priority)
2331
{
2432
{
2533
std::unique_lock<std::mutex> lock(mutex);
26-
has_free_thread.wait(lock, [this] { return active_jobs < m_size || shutdown; });
34+
job_finished.wait(lock, [this] { return !queue_size || active_jobs < queue_size || shutdown; });
2735
if (shutdown)
2836
return;
2937

30-
jobs.push(std::move(job));
38+
jobs.emplace(std::move(job), priority);
3139
++active_jobs;
40+
41+
if (threads.size() < std::min(num_threads, active_jobs))
42+
threads.emplace_back([this] { worker(); });
3243
}
33-
has_new_job_or_shutdown.notify_one();
44+
new_job_or_shutdown.notify_one();
3445
}
3546

36-
void ThreadPool::wait()
47+
template <typename Thread>
48+
void ThreadPoolImpl<Thread>::wait()
3749
{
3850
{
3951
std::unique_lock<std::mutex> lock(mutex);
40-
has_free_thread.wait(lock, [this] { return active_jobs == 0; });
52+
job_finished.wait(lock, [this] { return active_jobs == 0; });
4153

4254
if (first_exception)
4355
{
@@ -48,34 +60,37 @@ void ThreadPool::wait()
4860
}
4961
}
5062

51-
ThreadPool::~ThreadPool()
63+
template <typename Thread>
64+
ThreadPoolImpl<Thread>::~ThreadPoolImpl()
5265
{
5366
finalize();
5467
}
5568

56-
void ThreadPool::finalize()
69+
template <typename Thread>
70+
void ThreadPoolImpl<Thread>::finalize()
5771
{
5872
{
5973
std::unique_lock<std::mutex> lock(mutex);
6074
shutdown = true;
6175
}
6276

63-
has_new_job_or_shutdown.notify_all();
77+
new_job_or_shutdown.notify_all();
6478

6579
for (auto & thread : threads)
6680
thread.join();
6781

6882
threads.clear();
6983
}
7084

71-
size_t ThreadPool::active() const
85+
template <typename Thread>
86+
size_t ThreadPoolImpl<Thread>::active() const
7287
{
7388
std::unique_lock<std::mutex> lock(mutex);
7489
return active_jobs;
7590
}
7691

77-
78-
void ThreadPool::worker()
92+
template <typename Thread>
93+
void ThreadPoolImpl<Thread>::worker()
7994
{
8095
while (true)
8196
{
@@ -84,12 +99,12 @@ void ThreadPool::worker()
8499

85100
{
86101
std::unique_lock<std::mutex> lock(mutex);
87-
has_new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
102+
new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
88103
need_shutdown = shutdown;
89104

90105
if (!jobs.empty())
91106
{
92-
job = std::move(jobs.front());
107+
job = jobs.top().job;
93108
jobs.pop();
94109
}
95110
else
@@ -113,8 +128,8 @@ void ThreadPool::worker()
113128
shutdown = true;
114129
--active_jobs;
115130
}
116-
has_free_thread.notify_all();
117-
has_new_job_or_shutdown.notify_all();
131+
job_finished.notify_all();
132+
new_job_or_shutdown.notify_all();
118133
return;
119134
}
120135
}
@@ -124,11 +139,15 @@ void ThreadPool::worker()
124139
--active_jobs;
125140
}
126141

127-
has_free_thread.notify_all();
142+
job_finished.notify_all();
128143
}
129144
}
130145

131146

147+
template class ThreadPoolImpl<std::thread>;
148+
template class ThreadPoolImpl<ThreadFromGlobalPool>;
149+
150+
132151
void ExceptionHandler::setException(std::exception_ptr && exception)
133152
{
134153
std::unique_lock<std::mutex> lock(mutex);

0 commit comments

Comments
 (0)