使两个交叉通信的asio_service对象处于繁忙状态

Keeping two cross-communicating asio io_service objects busy

本文关键字:service asio 对象 状态 于繁忙 通信 两个      更新时间:2023-10-16

我使用boost:asio与多个io_services来保持不同形式的阻塞I/O分开。例如,我有一个io_service用于阻塞文件I/O,另一个用于长时间运行的cpu绑定任务(这可以扩展到第三个用于阻塞网络I/O等)。一般来说,我想确保一种形式的阻塞I/O不会使其他形式的阻塞I/O挨饿。

我遇到的问题是,因为在一个io_service中运行的任务可以将事件发布到其他io_service(例如,cpu绑定任务可能需要启动文件I/O操作,或者完成的文件I/O操作可能调用cpu绑定回调),我不知道如何保持两个io_services运行,直到它们都退出事件。

对于单个I/O服务,您可以这样做:

 shared_ptr<asio::io_service> io_service (new asio::io_service);
 shared_ptr<asio::io_service::work> work (
   new asio::io_service::work(*io_service));
 // Create worker thread(s) that call io_service->run()
 io_service->post(/* some event */);
 work.reset();
 // Join worker thread(s)

但是,如果我对两个io_services都这样做,那么我没有发布初始事件的那个io_services会立即完成。如果io_service B上的初始事件在io_service A上的任务向B上发送新事件之前完成,那么io_service B将提前完成。

我如何保持io_service B运行,而io_service A仍在处理事件(因为服务A中的一个队列事件可能会向B发布一个新事件),反之亦然,同时仍然确保两个io_services退出他们的run()方法,如果他们曾经都在同一时间的事件?

找到了一种方法来做到这一点,所以记录下来,以防有人在搜索中发现这个问题:

  • 创建每个N个交叉通信的io_services,为每个io_services创建一个工作对象,然后启动它们的工作线程

  • 创建一个不运行任何工作线程的"master" io_service对象

  • 不允许直接向服务发送事件。相反,创建io_services的访问器函数,它将:

    1. 在主线程上创建工作对象
    2. 将回调封装在一个运行真实回调的函数中,然后删除工作。
    3. 发送这个封装的回调函数。
  • 在执行的主流程中,一旦所有的io_services都启动了,并且你已经向其中至少一个提交了工作,在主io_service上调用run()。

  • 当主io_service的run()方法返回时,删除N个交叉通信io_services上的所有初始工作,并加入所有工作线程。

让主io_service的线程自己在其他io_service上工作,确保它们不会在主io_service工作完之前终止。让其他每个io_services在主io_service上为每个发布的回调都拥有自己的工作,确保主io_service不会耗尽工作,直到每个其他io_services不再有任何发布的回调来处理。

示例(可以封装在类中):

shared_ptr<boost::asio::io_service> master_io_service;
void RunWorker(boost::shared_ptr<boost::asio::io_service> io_service) {
  io_service->run();
}
void RunCallbackAndDeleteWork(boost::function<void()> callback,
                              boost::asio::io_service::work* work) {
  callback();
  delete work;
}
// All new posted callbacks must come through here, rather than being posted
// directly to the io_service object.
void PostToService(boost::shared_ptr<boost::asio::io_service> io_service,
                   boost::function<void()> callback) {
  io_service->post(boost::bind(
      &RunCallbackAndDeleteWork, callback,
      new boost::asio::io_service::work(*master_io_service)));
}
int main() {
  vector<boost::shared_ptr<boost::asio::io_service> > io_services;
  vector<boost::shared_ptr<boost::asio::io_service::work> > initial_work;
  boost::thread_pool worker_threads;
  master_io_service.reset(new boost::asio::io_service);
  const int kNumServices = X;
  const int kNumWorkersPerService = Y;
  for (int i = 0; i < kNumServices; ++i) {
    shared_ptr<boost::asio::io_service> io_service(new boost::asio::io_service);
    io_services.push_back(io_service);
    initial_work.push_back(new boost::asio::io_service::work(*io_service));
    for (int j = 0; j < kNumWorkersPerService; ++j) {
      worker_threads.create_thread(boost::bind(&RunWorker, io_service));
    }
  }
  // Use PostToService to start initial task(s) on at least one of the services
  master_io_service->run();
  // At this point, there is no real work left in the services, only the work
  // objects in the initial_work vector.
  initial_work.clear();
  worker_threads.join_all();
  return 0;
}

HTTP服务器示例2做了类似的事情,您可能会觉得很有用。它使用了一个io_service池的概念,它保留了shared_ptr<boost::asio::io_service>vector s和每个io_serviceshared_ptr<boost::asio::io_service::work>。它使用线程池来运行每个服务。

示例使用循环调度将工作分配给I/O服务,我认为这不适用于您的情况,因为您有io_service a和io_service b的特定任务