LCOV - code coverage report
Current view: top level - libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 88.2 % 68 60
Test Date: 2026-01-22 22:47:31 Functions: 87.5 % 16 14

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/boostorg/capy
       8              : //
       9              : 
      10              : #include <boost/capy/ex/thread_pool.hpp>
      11              : #include <boost/capy/detail/intrusive.hpp>
      12              : #include <condition_variable>
      13              : #include <mutex>
      14              : #include <stop_token>
      15              : #include <thread>
      16              : #include <vector>
      17              : 
      18              : namespace boost {
      19              : namespace capy {
      20              : 
      21              : //------------------------------------------------------------------------------
      22              : 
      23              : class thread_pool::impl
      24              : {
      25              :     struct work : detail::intrusive_queue<work>::node
      26              :     {
      27              :         coro h_;
      28              : 
      29          119 :         explicit work(coro h) noexcept
      30          119 :             : h_(h)
      31              :         {
      32          119 :         }
      33              : 
      34          119 :         void run()
      35              :         {
      36          119 :             auto h = h_;
      37          119 :             delete this;
      38          119 :             h.resume();
      39          119 :         }
      40              : 
      41            0 :         void destroy()
      42              :         {
      43            0 :             delete this;
      44            0 :         }
      45              :     };
      46              : 
      47              :     std::mutex mutex_;
      48              :     std::condition_variable_any cv_;
      49              :     detail::intrusive_queue<work> q_;
      50              :     std::vector<std::jthread> threads_;
      51              :     std::size_t num_threads_;
      52              :     std::once_flag start_flag_;
      53              : 
      54              : public:
      55           51 :     ~impl()
      56              :     {
      57           51 :         stop();
      58           51 :         threads_.clear();
      59              : 
      60           51 :         while(auto* w = q_.pop())
      61            0 :             w->destroy();
      62           51 :     }
      63              : 
      64              :     explicit
      65           51 :     impl(std::size_t num_threads)
      66           51 :         : num_threads_(num_threads)
      67              :     {
      68           51 :         if(num_threads_ == 0)
      69            1 :             num_threads_ = std::thread::hardware_concurrency();
      70           51 :         if(num_threads_ == 0)
      71            0 :             num_threads_ = 1;
      72           51 :     }
      73              : 
      74              :     void
      75          119 :     post(coro h)
      76              :     {
      77          119 :         ensure_started();
      78          119 :         auto* w = new work(h);
      79              :         {
      80          119 :             std::lock_guard<std::mutex> lock(mutex_);
      81          119 :             q_.push(w);
      82          119 :         }
      83          119 :         cv_.notify_one();
      84          119 :     }
      85              : 
      86              :     void
      87           51 :     stop() noexcept
      88              :     {
      89           82 :         for (auto& t : threads_)
      90           31 :             t.request_stop();
      91           51 :         cv_.notify_all();
      92           51 :     }
      93              : 
      94              : private:
      95              :     void
      96          119 :     ensure_started()
      97              :     {
      98          119 :         std::call_once(start_flag_, [this]{
      99           19 :             threads_.reserve(num_threads_);
     100           50 :             for(std::size_t i = 0; i < num_threads_; ++i)
     101           62 :                 threads_.emplace_back([this](std::stop_token st){ run(st); });
     102           19 :         });
     103          119 :     }
     104              : 
     105              :     void
     106           31 :     run(std::stop_token st)
     107              :     {
     108              :         for(;;)
     109              :         {
     110          150 :             work* w = nullptr;
     111              :             {
     112          150 :                 std::unique_lock<std::mutex> lock(mutex_);
     113          354 :                 if(!cv_.wait(lock, st, [this]{ return !q_.empty(); }))
     114           62 :                     return;
     115          119 :                 w = q_.pop();
     116          150 :             }
     117          119 :             w->run();
     118          119 :         }
     119              :     }
     120              : };
     121              : 
     122              : //------------------------------------------------------------------------------
     123              : 
     124           51 : thread_pool::
     125              : ~thread_pool()
     126              : {
     127           51 :     shutdown();
     128           51 :     destroy();
     129           51 :     delete impl_;
     130           51 : }
     131              : 
     132           51 : thread_pool::
     133           51 : thread_pool(std::size_t num_threads)
     134           51 :     : impl_(new impl(num_threads))
     135              : {
     136           51 : }
     137              : 
     138              : void
     139            0 : thread_pool::
     140              : stop() noexcept
     141              : {
     142            0 :     impl_->stop();
     143            0 : }
     144              : 
     145              : //------------------------------------------------------------------------------
     146              : 
     147              : void
     148          119 : thread_pool::executor_type::
     149              : post(coro h) const
     150              : {
     151          119 :     pool_->impl_->post(h);
     152          119 : }
     153              : 
     154              : } // capy
     155              : } // boost
        

Generated by: LCOV version 2.3