Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/boostorg/capy
8 : //
9 :
10 : #include <boost/capy/ex/thread_pool.hpp>
11 : #include <boost/capy/detail/intrusive.hpp>
12 : #include <condition_variable>
13 : #include <mutex>
14 : #include <stop_token>
15 : #include <thread>
16 : #include <vector>
17 :
18 : namespace boost {
19 : namespace capy {
20 :
21 : //------------------------------------------------------------------------------
22 :
23 : class thread_pool::impl
24 : {
25 : struct work : detail::intrusive_queue<work>::node
26 : {
27 : coro h_;
28 :
29 119 : explicit work(coro h) noexcept
30 119 : : h_(h)
31 : {
32 119 : }
33 :
34 119 : void run()
35 : {
36 119 : auto h = h_;
37 119 : delete this;
38 119 : h.resume();
39 119 : }
40 :
41 0 : void destroy()
42 : {
43 0 : delete this;
44 0 : }
45 : };
46 :
47 : std::mutex mutex_;
48 : std::condition_variable_any cv_;
49 : detail::intrusive_queue<work> q_;
50 : std::vector<std::jthread> threads_;
51 : std::size_t num_threads_;
52 : std::once_flag start_flag_;
53 :
54 : public:
55 51 : ~impl()
56 : {
57 51 : stop();
58 51 : threads_.clear();
59 :
60 51 : while(auto* w = q_.pop())
61 0 : w->destroy();
62 51 : }
63 :
64 : explicit
65 51 : impl(std::size_t num_threads)
66 51 : : num_threads_(num_threads)
67 : {
68 51 : if(num_threads_ == 0)
69 1 : num_threads_ = std::thread::hardware_concurrency();
70 51 : if(num_threads_ == 0)
71 0 : num_threads_ = 1;
72 51 : }
73 :
74 : void
75 119 : post(coro h)
76 : {
77 119 : ensure_started();
78 119 : auto* w = new work(h);
79 : {
80 119 : std::lock_guard<std::mutex> lock(mutex_);
81 119 : q_.push(w);
82 119 : }
83 119 : cv_.notify_one();
84 119 : }
85 :
86 : void
87 51 : stop() noexcept
88 : {
89 82 : for (auto& t : threads_)
90 31 : t.request_stop();
91 51 : cv_.notify_all();
92 51 : }
93 :
94 : private:
95 : void
96 119 : ensure_started()
97 : {
98 119 : std::call_once(start_flag_, [this]{
99 19 : threads_.reserve(num_threads_);
100 50 : for(std::size_t i = 0; i < num_threads_; ++i)
101 62 : threads_.emplace_back([this](std::stop_token st){ run(st); });
102 19 : });
103 119 : }
104 :
105 : void
106 31 : run(std::stop_token st)
107 : {
108 : for(;;)
109 : {
110 150 : work* w = nullptr;
111 : {
112 150 : std::unique_lock<std::mutex> lock(mutex_);
113 354 : if(!cv_.wait(lock, st, [this]{ return !q_.empty(); }))
114 62 : return;
115 119 : w = q_.pop();
116 150 : }
117 119 : w->run();
118 119 : }
119 : }
120 : };
121 :
122 : //------------------------------------------------------------------------------
123 :
124 51 : thread_pool::
125 : ~thread_pool()
126 : {
127 51 : shutdown();
128 51 : destroy();
129 51 : delete impl_;
130 51 : }
131 :
132 51 : thread_pool::
133 51 : thread_pool(std::size_t num_threads)
134 51 : : impl_(new impl(num_threads))
135 : {
136 51 : }
137 :
138 : void
139 0 : thread_pool::
140 : stop() noexcept
141 : {
142 0 : impl_->stop();
143 0 : }
144 :
145 : //------------------------------------------------------------------------------
146 :
147 : void
148 119 : thread_pool::executor_type::
149 : post(coro h) const
150 : {
151 119 : pool_->impl_->post(h);
152 119 : }
153 :
154 : } // capy
155 : } // boost
|