Boost asio只需要在m个工作完成后发布n个工作

boost asio need to post n jobs only after m jobs have finished

本文关键字:工作 asio Boost      更新时间:2023-10-16

我正在寻找一种方法来等待一些作业完成,然后执行另一个完全不同数量的作业。当然是用线。简单解释一下:我创建了两个工作线程,都在io_service上执行run。下面的代码取自这里。

为了简单起见,我创建了两种类型的作业,CalculateFibCalculateFib2。我希望CalculateFib2作业开始后,只有在CalculateFib作业完成后。我尝试使用这里解释的条件变量,但如果CalculateFib2作业多于一个,则程序会挂起。我做错了什么?

thx, dodol

#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>
boost::mutex global_stream_lock;
boost::mutex mx;
boost::condition_variable cv;
void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service)
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Start" << std::endl;
    global_stream_lock.unlock();
    io_service->run();
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Finish" << std::endl;
    global_stream_lock.unlock();
}
size_t fib( size_t n )
{
    if ( n <= 1 )
    {
        return n;
    }
    boost::this_thread::sleep( 
        boost::posix_time::milliseconds( 1000 )
        );
    return fib( n - 1 ) + fib( n - 2);
}
void CalculateFib( size_t n )
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Now calculating fib( " << n << " ) " << std::endl;
    global_stream_lock.unlock();
    size_t f = fib( n );
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] fib( " << n << " ) = " << f << std::endl;
    global_stream_lock.unlock();
    boost::lock_guard<boost::mutex> lk(mx);
    cv.notify_all();
}
void CalculateFib2( size_t n )
{
    boost::unique_lock<boost::mutex> lk(mx);
    cv.wait(lk);
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Now calculating fib2( " << n << " ) " << std::endl;
    global_stream_lock.unlock();
    size_t f = fib( n );
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] fib2( " << n << " ) = " << f << std::endl;
    global_stream_lock.unlock();
}
int main( int argc, char * argv[] )
{
    boost::shared_ptr< boost::asio::io_service > io_service(
        new boost::asio::io_service
        );
    boost::shared_ptr< boost::asio::io_service::work > work(
        new boost::asio::io_service::work( *io_service )
        );
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] The program will exit when all work has finished."
        << std::endl;
    global_stream_lock.unlock();
    boost::thread_group worker_threads;
    for( int x = 0; x < 2; ++x )
    {
        worker_threads.create_thread( 
            boost::bind( &WorkerThread, io_service )
            );
    }
    io_service->post( boost::bind( CalculateFib, 5 ) );
    io_service->post( boost::bind( CalculateFib, 4 ) );
    io_service->post( boost::bind( CalculateFib, 3 ) );
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    work.reset();
    worker_threads.join_all();
    return 0;
}

CalculateFib2中,您要做的第一件事是等待条件 (cv)。此条件仅在CalculateFib结束时发出信号。因此,理所当然地,执行永远不会继续,除非条件被触发(通过发布CalculateFib)作业。

确实,像这样添加任何其他行:

io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib, 5 ) );   // <-- ADDED

使执行运行到完成。

为了更清楚地说明:如果您(及时)隔离一个Fib2批处理,如

io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );
boost::this_thread::sleep(boost::posix_time::seconds( 10 ));
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );

无论线程数如何,所有Fib2作业都将始终阻塞,因为Fib作业在发布之前都已退出。一个简单的

io_service->post( boost::bind( CalculateFib, 1 ) );

将解锁所有等待线程(即只有等待线程的数量,即可用线程的数量-1),因为Fib()作业也占用一个线程。现在有<7个线程,这将死锁,因为没有线程可以在上启动Fib()作业(所有线程都阻塞在Fib2中等待)


老实说,我不明白你在日程安排方面想要达到什么目的。我认为您应该监视作业队列,并仅在达到所需的项目数量时才显式地发布作业("任务")。这样你就可以KISS并获得一个非常灵活的工作调度接口。

一般来说,使用线程组(池),您希望避免在不确定的时间内阻塞线程。这有可能使您的工作调度陷入死锁,否则会导致性能不佳。