Boost asio只需要在m个工作完成后发布n个工作
boost asio need to post n jobs only after m jobs have finished
我正在寻找一种方法来等待一些作业完成,然后执行另一个完全不同数量的作业。当然是用线。简单解释一下:我创建了两个工作线程,都在io_service上执行run。下面的代码取自这里。
为了简单起见,我创建了两种类型的作业,CalculateFib和CalculateFib2。我希望CalculateFib2作业开始后,只有在CalculateFib作业完成后。我尝试使用这里解释的条件变量,但如果CalculateFib2作业多于一个,则程序会挂起。我做错了什么?thx, dodol
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>
boost::mutex global_stream_lock;
boost::mutex mx;
boost::condition_variable cv;
void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service)
{
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Thread Start" << std::endl;
global_stream_lock.unlock();
io_service->run();
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Thread Finish" << std::endl;
global_stream_lock.unlock();
}
size_t fib( size_t n )
{
if ( n <= 1 )
{
return n;
}
boost::this_thread::sleep(
boost::posix_time::milliseconds( 1000 )
);
return fib( n - 1 ) + fib( n - 2);
}
void CalculateFib( size_t n )
{
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Now calculating fib( " << n << " ) " << std::endl;
global_stream_lock.unlock();
size_t f = fib( n );
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] fib( " << n << " ) = " << f << std::endl;
global_stream_lock.unlock();
boost::lock_guard<boost::mutex> lk(mx);
cv.notify_all();
}
void CalculateFib2( size_t n )
{
boost::unique_lock<boost::mutex> lk(mx);
cv.wait(lk);
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Now calculating fib2( " << n << " ) " << std::endl;
global_stream_lock.unlock();
size_t f = fib( n );
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] fib2( " << n << " ) = " << f << std::endl;
global_stream_lock.unlock();
}
int main( int argc, char * argv[] )
{
boost::shared_ptr< boost::asio::io_service > io_service(
new boost::asio::io_service
);
boost::shared_ptr< boost::asio::io_service::work > work(
new boost::asio::io_service::work( *io_service )
);
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] The program will exit when all work has finished."
<< std::endl;
global_stream_lock.unlock();
boost::thread_group worker_threads;
for( int x = 0; x < 2; ++x )
{
worker_threads.create_thread(
boost::bind( &WorkerThread, io_service )
);
}
io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
work.reset();
worker_threads.join_all();
return 0;
}
在CalculateFib2
中,您要做的第一件事是等待条件 (cv
)。此条件仅在CalculateFib
结束时发出信号。因此,理所当然地,执行永远不会继续,除非条件被触发(通过发布CalculateFib
)作业。
确实,像这样添加任何其他行:
io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib, 5 ) ); // <-- ADDED
使执行运行到完成。
为了更清楚地说明:如果您(及时)隔离一个Fib2批处理,如
io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );
boost::this_thread::sleep(boost::posix_time::seconds( 10 ));
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
无论线程数如何,所有Fib2作业都将始终阻塞,因为Fib作业在发布之前都已退出。一个简单的
io_service->post( boost::bind( CalculateFib, 1 ) );
将解锁所有等待线程(即只有等待线程的数量,即可用线程的数量-1),因为Fib()作业也占用一个线程。现在有<7个线程,这将死锁,因为没有线程可以在上启动Fib()作业(所有线程都阻塞在Fib2中等待)
老实说,我不明白你在日程安排方面想要达到什么目的。我认为您应该监视作业队列,并仅在达到所需的项目数量时才显式地发布作业("任务")。这样你就可以KISS并获得一个非常灵活的工作调度接口。
一般来说,使用线程组(池),您希望避免在不确定的时间内阻塞线程。这有可能使您的工作调度陷入死锁,否则会导致性能不佳。相关文章:
- boost信号和插槽在不同的线程中不工作(使用boost::asio::io_service)
- 异步操作的 Asio 处理程序在其同步对应项正常工作时不会调用
- boost::asio 使用 post() 时没有调用处理程序,当直接调用函数时有效(io_context有工作)
- 防止Boost :: Asio :: IO_Context在没有更多工作时返回
- boost::asio绞线缠绕的线程无法立即工作
- Boost::Windows上的Asio无法异步工作
- BOOST :: ASIO带有主/工人线程 - 我可以在发布工作之前启动事件循环
- 如何将信号连接到boost :: asio :: io_service在其他线程上发布工作时
- asio::io_service.run() 尽管工作对象退出
- 在linux下使用boost::thread创建一个boost::asio工作线程
- boost::asio 中的未经请求的消息使应用程序崩溃,没有 SSL 它可以正常工作,为什么
- boost::asio http客户端停止工作,我不知道为什么
- XMPP库与Boost.ASIO一起工作
- boost::asio::io_service如何确定工作的优先级
- 来自 boost::asio::async_write 的 WriteHandler 在连接断开时无法正常工作(防火墙/手动断开网络)
- 分配io_service工作 - boost::asio
- 如何在 Boost.Asio 中完成工作项时调用函数
- Boost.Asio deadline_timer未按预期工作
- 阻止Boost Asio工作线程
- 使用boost::asio::async_read失败,但boost::asio::read工作(我使用io_strea