线程池:在所有工作完成之前销毁块

Thread Pool: Block destruction until all work is done

本文关键字:工作 线程      更新时间:2023-10-16

我有以下线程池实现:

template<typename... event_args>
class thread_pool{
public:
    using handler_type = std::function<void(event_args...)>;
    thread_pool(handler_type&& handler, std::size_t N = 4, bool finish_before_exit = true) : _handler(std::forward<handler_type&&>(handler)),_workers(N),_running(true),_finish_work_before_exit(finish_before_exit)
    {
        for(auto&& worker: _workers)
        {
            //worker function
            worker = std::thread([this]()
            {
                while (_running)
                {
                    //wait for work
                    std::unique_lock<std::mutex> _lk{_wait_mutex};
                    _cv.wait(_lk, [this]{
                        return !_events.empty() || !_running;
                    });
                    //_lk unlocked
                    //check to see why we woke up
                    if (!_events.empty()) {//was it new work
                        std::unique_lock<std::mutex> _readlk(_queue_mutex);
                        auto data = _events.front();
                        _events.pop();
                        _readlk.unlock();
                        invoke(std::move(_handler), std::move(data));
                        _cv.notify_all();
                    }else if(!_running){//was it a signal to exit
                        break;
                    }
                    //or was it spurious and we should just ignore it
                }
            });
            //end worker function
        }
    }
    ~thread_pool()
    {
        if(_finish_work_before_exit)
        {//block destruction until all work is done
            std::condition_variable _work_remains;
            std::mutex _wr;
            std::unique_lock<std::mutex> lk{_wr};
            _work_remains.wait(lk,[this](){
                return _events.empty();
            });
        }
        _running=false;
        //let all workers know to exit
        _cv.notify_all();

        //attempt to join all workers
        for(auto&& _worker: _workers)
        {
            if(_worker.joinable())
            {
                _worker.join();
            }
        }
    }
    handler_type& handler()
    {
        return _handler;
    }
    void propagate(event_args&&... args)
    {
        //lock before push
        std::unique_lock<std::mutex> _lk(_queue_mutex);
        {
            _events.emplace(std::make_tuple(args...));
        }
        _lk.unlock();//explicit unlock
        _cv.notify_one();//let worker know that data is available
    }
private:
    bool _finish_work_before_exit;
    handler_type _handler;
    std::queue<std::tuple<event_args...>> _events;
    std::vector<std::thread> _workers;
    std::atomic_bool _running;
    std::condition_variable _cv;
    std::mutex _wait_mutex;
    std::mutex _queue_mutex;

    //helpers used to unpack tuple into function call
    template<typename Func, typename Tuple, std::size_t... I>
    auto invoke_(Func&& func, Tuple&& t, std::index_sequence<I...>)
    {
        return func(std::get<I>(std::forward<Tuple&&>(t))...);
    }
    template<typename Func, typename Tuple, typename Indicies = std::make_index_sequence<std::tuple_size<Tuple>::value>>
    auto invoke(Func&& func, Tuple&& t)
    {
        return invoke_(std::forward<Func&&>(func), std::forward<Tuple&&>(t), Indicies());
    }
};

我最近在析构函数中添加了这一节:

if(_finish_work_before_exit)
{//block destruction until all work is done
    std::condition_variable _work_remains;
    std::mutex _wr;
    std::unique_lock<std::mutex> lk{_wr};
    _work_remains.wait(lk,[this](){
        return _events.empty();
    });
}

目的是让析构函数阻塞,直到工作队列被完全消耗。

但是它似乎使程序陷入死锁。所有的工作都完成了,但是等待似乎并没有在工作完成后结束。

考虑这个例子main:

std::mutex writemtx;

thread_pool<int> pool{
    [&](int i){
        std::unique_lock<std::mutex> lk{writemtx};
        std::cout<<i<<" : "<<std::this_thread::get_id()<<std::endl;
    },
    8//threads
};
for (int i=0; i<8192; ++i) {
    pool.propagate(std::move(i));
}

如何让析构函数等待工作完成而不引起死锁?

你的代码是死锁的原因是_work_remains是一个条件变量,它没有被你的代码的任何部分"通知"。您需要将其作为一个类属性,并由从_events获取最后一个事件的任何线程通知。