使两个交叉通信的asio_service对象处于繁忙状态
Keeping two cross-communicating asio io_service objects busy
我使用boost:asio与多个io_services来保持不同形式的阻塞I/O分开。例如,我有一个io_service用于阻塞文件I/O,另一个用于长时间运行的cpu绑定任务(这可以扩展到第三个用于阻塞网络I/O等)。一般来说,我想确保一种形式的阻塞I/O不会使其他形式的阻塞I/O挨饿。
我遇到的问题是,因为在一个io_service中运行的任务可以将事件发布到其他io_service(例如,cpu绑定任务可能需要启动文件I/O操作,或者完成的文件I/O操作可能调用cpu绑定回调),我不知道如何保持两个io_services运行,直到它们都退出事件。
对于单个I/O服务,您可以这样做:
shared_ptr<asio::io_service> io_service (new asio::io_service);
shared_ptr<asio::io_service::work> work (
new asio::io_service::work(*io_service));
// Create worker thread(s) that call io_service->run()
io_service->post(/* some event */);
work.reset();
// Join worker thread(s)
但是,如果我对两个io_services都这样做,那么我没有发布初始事件的那个io_services会立即完成。如果io_service B上的初始事件在io_service A上的任务向B上发送新事件之前完成,那么io_service B将提前完成。
我如何保持io_service B运行,而io_service A仍在处理事件(因为服务A中的一个队列事件可能会向B发布一个新事件),反之亦然,同时仍然确保两个io_services退出他们的run()方法,如果他们曾经都在同一时间的事件?
找到了一种方法来做到这一点,所以记录下来,以防有人在搜索中发现这个问题:
-
创建每个N个交叉通信的io_services,为每个io_services创建一个工作对象,然后启动它们的工作线程
-
创建一个不运行任何工作线程的"master" io_service对象
-
不允许直接向服务发送事件。相反,创建io_services的访问器函数,它将:
- 在主线程上创建工作对象
- 将回调封装在一个运行真实回调的函数中,然后删除工作。
- 发送这个封装的回调函数。
-
在执行的主流程中,一旦所有的io_services都启动了,并且你已经向其中至少一个提交了工作,在主io_service上调用run()。
-
当主io_service的run()方法返回时,删除N个交叉通信io_services上的所有初始工作,并加入所有工作线程。
让主io_service的线程自己在其他io_service上工作,确保它们不会在主io_service工作完之前终止。让其他每个io_services在主io_service上为每个发布的回调都拥有自己的工作,确保主io_service不会耗尽工作,直到每个其他io_services不再有任何发布的回调来处理。
示例(可以封装在类中):
shared_ptr<boost::asio::io_service> master_io_service;
void RunWorker(boost::shared_ptr<boost::asio::io_service> io_service) {
io_service->run();
}
void RunCallbackAndDeleteWork(boost::function<void()> callback,
boost::asio::io_service::work* work) {
callback();
delete work;
}
// All new posted callbacks must come through here, rather than being posted
// directly to the io_service object.
void PostToService(boost::shared_ptr<boost::asio::io_service> io_service,
boost::function<void()> callback) {
io_service->post(boost::bind(
&RunCallbackAndDeleteWork, callback,
new boost::asio::io_service::work(*master_io_service)));
}
int main() {
vector<boost::shared_ptr<boost::asio::io_service> > io_services;
vector<boost::shared_ptr<boost::asio::io_service::work> > initial_work;
boost::thread_pool worker_threads;
master_io_service.reset(new boost::asio::io_service);
const int kNumServices = X;
const int kNumWorkersPerService = Y;
for (int i = 0; i < kNumServices; ++i) {
shared_ptr<boost::asio::io_service> io_service(new boost::asio::io_service);
io_services.push_back(io_service);
initial_work.push_back(new boost::asio::io_service::work(*io_service));
for (int j = 0; j < kNumWorkersPerService; ++j) {
worker_threads.create_thread(boost::bind(&RunWorker, io_service));
}
}
// Use PostToService to start initial task(s) on at least one of the services
master_io_service->run();
// At this point, there is no real work left in the services, only the work
// objects in the initial_work vector.
initial_work.clear();
worker_threads.join_all();
return 0;
}
HTTP服务器示例2做了类似的事情,您可能会觉得很有用。它使用了一个io_service
池的概念,它保留了shared_ptr<boost::asio::io_service>
的vector
s和每个io_service
的shared_ptr<boost::asio::io_service::work>
。它使用线程池来运行每个服务。
示例使用循环调度将工作分配给I/O服务,我认为这不适用于您的情况,因为您有io_service
a和io_service
b的特定任务
- 理解boost::asio-async_read在无需读取内容时的行为
- 提升 ASIO 无法识别计时器对象
- C++Boost Asio Pool线程,带有lambda函数和传递引用变量
- boost::asio::steady_timer()与sleep()我应该使用哪一个
- boost::asio如何生成多个协同程序,然后加入它们
- 缓慢提升ASIO
- 从 Boost ASIO 获取 epoll 描述符 io_service对象
- 如何在 Boost.Asio 中使用 Zero-copy sendmsg/receive
- C++ Boost::asio串行通信与Arduino无法写入
- 如何使用 Boost Asio 在 Android 上获取我的本地 udp IP 地址?
- 提升 Asio TCP 服务器 处理多个客户端
- boost::asio UDP 广播客户端仅接收"fast"数据包
- 提升 ASIO - io_service 不要等待连接到线程
- 执行时使用 boost::asio::d eadline_timer 时出错
- Boost.Asio/OpenSSL HTTPS GET certificate trouble
- C++ boost::asio::ip::tcp::acceptor 有时不接受连接器?
- boost::asio data owning `ConstBufferSequence`
- 如何替换此示例代码片段中已弃用的handler_type_t或 boost::asio::handler_type?
- 将 boost 序列化对象的 asio::streambuf 表示转换为 Beast 的 DynamicBody req.body()
- bad_weak_ptr使用从 boost::asio::io_context::service 继承的类