带有阻塞的c++工作队列

c++ work queues with blocking

本文关键字:c++ 工作队列      更新时间:2023-10-16

这个问题应该比我的前几个问题简单一点。我在程序中实现了以下工作队列:

Pool.h:

// tpool class
// It's always closed. :glasses:
#ifndef __POOL_H
#define __POOL_H
class tpool {
    public:
        tpool( std::size_t tpool_size );
        ~tpool();
        template< typename Task >
        void run_task( Task task ){
        boost::unique_lock< boost::mutex > lock( mutex_ );
            if( 0 < available_ ) {
                --available_;
                io_service_.post( boost::bind( &tpool::wrap_task, this, boost::function< void() > ( task ) ) );
            }
        }
    private:
        boost::asio::io_service io_service_;
        boost::asio::io_service::work work_;
        boost::thread_group threads_;
        std::size_t available_;
        boost::mutex mutex_;
        void wrap_task( boost::function< void() > task );
};
extern tpool dbpool;
#endif

pool.cpp:

#include <boost/asio/io_service.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include "pool.h"
tpool::tpool( std::size_t tpool_size ) : work_( io_service_ ), available_( tpool_size ) {
    for ( std::size_t i = 0; i < tpool_size; ++i ){
        threads_.create_thread( boost::bind( &boost::asio::io_service::run, &io_service_ ) );
    }
}
tpool::~tpool() {
    io_service_.stop();
    try {
        threads_.join_all();
    }
    catch( ... ) {}
}
void tpool::wrap_task( boost::function< void() > task ) {
    // run the supplied task
    try {
        task();
    } // suppress exceptions
    catch( ... ) {
    }
    boost::unique_lock< boost::mutex > lock( mutex_ );
    ++available_;
}
tpool dbpool( 50 );

但问题是,并不是所有对run_task()的调用都是由工作线程完成的。我不确定是因为它没有进入队列,还是因为创建任务的线程退出时任务消失了。

所以我的问题是,我必须给boost::thread什么特别的东西才能让它等到队列解锁?进入队列的任务的预期寿命是多少?当创建任务的线程退出时,这些任务是否超出了范围?如果是,我该如何防止这种情况发生?

编辑:我对我的代码做了以下更改:

template< typename Task >
void run_task( Task task ){ // add item to the queue
    io_service_.post( boost::bind( &tpool::wrap_task, this, boost::function< void() > ( task ) ) );
}

现在我看到所有条目都输入正确。然而,我还有一个挥之不去的问题:添加到队列中的任务的生存期是多少?一旦创建它们的线程退出,它们是否就不存在了?

好吧。这真的很简单;您正在拒绝发布的任务!

template< typename Task >
void run_task(task task){
    boost::unique_lock<boost::mutex> lock( mutex_ );
    if(0 < available_) {
        --available_;
        io_service_.post(boost::bind(&tpool::wrap_task, this, boost::function< void() > ( task )));
    }
}

请注意,lock"等待",直到互斥对象不为线程所拥有。这种情况可能已经存在,并且可能是在available_已经为0时。现在的线路

if(0 < available_) {

这条线只是条件。这并不"神奇",因为你锁住了mutex_。(程序甚至不知道mutex_available_之间存在关系)。因此,如果available_ <= 0,您将跳过发布作业。


解决方案#1

您应该使用io_service为您排队。这很可能是你最初想要实现的目标。io_service不是跟踪"可用"线程,而是为您做这项工作。通过在尽可能多的线程上运行io_service,可以控制它可以使用的线程数量。易于理解的

由于io_service已经是线程安全的,所以您可以不使用锁。

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>
// tpool class
// It's always closed. :glasses:
#ifndef __POOL_H
#define __POOL_H
class tpool {
    public:
        tpool( std::size_t tpool_size );
        ~tpool();
        template<typename Task>
        void run_task(Task task){
            io_service_.post(task);
        }
    private:
        // note the order of destruction of members
        boost::asio::io_service io_service_;
        boost::asio::io_service::work work_;
        boost::thread_group threads_;
};
extern tpool dbpool;
#endif
#include <boost/asio/io_service.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
//#include "pool.h"
tpool::tpool(std::size_t tpool_size) : work_(io_service_) {
    for (std::size_t i = 0; i < tpool_size; ++i)
    {
        threads_.create_thread( 
                boost::bind(&boost::asio::io_service::run, &io_service_) 
            );
    }
}
tpool::~tpool() {
    io_service_.stop();
    try {
        threads_.join_all();
    }
    catch(...) {}
}
void foo() { std::cout << __PRETTY_FUNCTION__ << "n"; }
void bar() { std::cout << __PRETTY_FUNCTION__ << "n"; }
int main() {
    tpool dbpool(50);
    dbpool.run_task(foo);
    dbpool.run_task(bar);
    boost::this_thread::sleep_for(boost::chrono::seconds(1));
}

出于关闭目的,您需要启用"清除"io_service::work对象,否则您的池将永远不会退出。


解决方案#2

不要使用io_service,而是使用条件变量滚动您自己的队列实现,以通知工作线程正在发布的新工作。同样,工作线程的数量由组中线程的数量决定。

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;
      typedef function<void()> job_t;
      std::deque<job_t> _queue;
      thread_group pool;
      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (auto job = q.dequeue())
              (*job)();
      }
  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }
      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(std::move(job));
          cv.notify_one();
      }
      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;
          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
          if (_queue.empty())
              return none;
          auto job = std::move(_queue.front());
          _queue.pop_front();
          return std::move(job);
      }
      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }
          pool.join_all();
      }
};
void the_work(int id)
{
    std::cout << "worker " << id << " enteredn";
    // no more synchronization; the pool size determines max concurrency
    std::cout << "worker " << id << " start workn";
    this_thread::sleep_for(chrono::seconds(2));
    std::cout << "worker " << id << " donen";
}
int main()
{
    thread_pool pool; // uses 1 thread per core
    for (int i = 0; i < 10; ++i)
        pool.enqueue(bind(the_work, i));
}