22#include < iostream>
33
44
5- ThreadPool::ThreadPool (size_t m_size)
6- : m_size(m_size)
5+ template <typename Thread>
6+ ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t num_threads)
7+ : ThreadPoolImpl(num_threads, num_threads)
78{
8- threads.reserve (m_size);
9+ }
10+
11+ template <typename Thread>
12+ ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t num_threads, size_t queue_size)
13+ : num_threads(num_threads), queue_size(queue_size)
14+ {
15+ threads.reserve (num_threads);
916
1017 try
1118 {
12- for (size_t i = 0 ; i < m_size ; ++i)
19+ for (size_t i = 0 ; i < num_threads ; ++i)
1320 threads.emplace_back ([this ] { worker (); });
1421 }
1522 catch (...)
@@ -19,25 +26,30 @@ ThreadPool::ThreadPool(size_t m_size)
1926 }
2027}
2128
22- void ThreadPool::schedule (Job job)
29+ template <typename Thread>
30+ void ThreadPoolImpl<Thread>::schedule(Job job, int priority)
2331{
2432 {
2533 std::unique_lock<std::mutex> lock (mutex);
26- has_free_thread .wait (lock, [this ] { return active_jobs < m_size || shutdown; });
34+ job_finished .wait (lock, [this ] { return !queue_size || active_jobs < queue_size || shutdown; });
2735 if (shutdown)
2836 return ;
2937
30- jobs.push (std::move (job));
38+ jobs.emplace (std::move (job), priority );
3139 ++active_jobs;
40+
41+ if (threads.size () < std::min (num_threads, active_jobs))
42+ threads.emplace_back ([this ] { worker (); });
3243 }
33- has_new_job_or_shutdown .notify_one ();
44+ new_job_or_shutdown .notify_one ();
3445}
3546
36- void ThreadPool::wait ()
47+ template <typename Thread>
48+ void ThreadPoolImpl<Thread>::wait()
3749{
3850 {
3951 std::unique_lock<std::mutex> lock (mutex);
40- has_free_thread .wait (lock, [this ] { return active_jobs == 0 ; });
52+ job_finished .wait (lock, [this ] { return active_jobs == 0 ; });
4153
4254 if (first_exception)
4355 {
@@ -48,34 +60,37 @@ void ThreadPool::wait()
4860 }
4961}
5062
51- ThreadPool::~ThreadPool ()
63+ template <typename Thread>
64+ ThreadPoolImpl<Thread>::~ThreadPoolImpl ()
5265{
5366 finalize ();
5467}
5568
56- void ThreadPool::finalize ()
69+ template <typename Thread>
70+ void ThreadPoolImpl<Thread>::finalize()
5771{
5872 {
5973 std::unique_lock<std::mutex> lock (mutex);
6074 shutdown = true ;
6175 }
6276
63- has_new_job_or_shutdown .notify_all ();
77+ new_job_or_shutdown .notify_all ();
6478
6579 for (auto & thread : threads)
6680 thread.join ();
6781
6882 threads.clear ();
6983}
7084
71- size_t ThreadPool::active () const
85+ template <typename Thread>
86+ size_t ThreadPoolImpl<Thread>::active() const
7287{
7388 std::unique_lock<std::mutex> lock (mutex);
7489 return active_jobs;
7590}
7691
77-
78- void ThreadPool ::worker ()
92+ template < typename Thread>
93+ void ThreadPoolImpl<Thread> ::worker()
7994{
8095 while (true )
8196 {
@@ -84,12 +99,12 @@ void ThreadPool::worker()
8499
85100 {
86101 std::unique_lock<std::mutex> lock (mutex);
87- has_new_job_or_shutdown .wait (lock, [this ] { return shutdown || !jobs.empty (); });
102+ new_job_or_shutdown .wait (lock, [this ] { return shutdown || !jobs.empty (); });
88103 need_shutdown = shutdown;
89104
90105 if (!jobs.empty ())
91106 {
92- job = std::move ( jobs.front ()) ;
107+ job = jobs.top (). job ;
93108 jobs.pop ();
94109 }
95110 else
@@ -113,8 +128,8 @@ void ThreadPool::worker()
113128 shutdown = true ;
114129 --active_jobs;
115130 }
116- has_free_thread .notify_all ();
117- has_new_job_or_shutdown .notify_all ();
131+ job_finished .notify_all ();
132+ new_job_or_shutdown .notify_all ();
118133 return ;
119134 }
120135 }
@@ -124,11 +139,15 @@ void ThreadPool::worker()
124139 --active_jobs;
125140 }
126141
127- has_free_thread .notify_all ();
142+ job_finished .notify_all ();
128143 }
129144}
130145
131146
147+ template class ThreadPoolImpl <std::thread>;
148+ template class ThreadPoolImpl <ThreadFromGlobalPool>;
149+
150+
132151void ExceptionHandler::setException (std::exception_ptr && exception)
133152{
134153 std::unique_lock<std::mutex> lock (mutex);
0 commit comments