使用Boost Asio设置发布队列大小的限制

Setting limit on post queue size with Boost Asio?

本文关键字:队列 Asio Boost 设置 布队列 使用      更新时间:2023-10-16

我使用boost::asio::io_service作为基本线程池。一些线程被添加到io_service中,主线程开始发布处理程序,辅助线程开始运行处理程序,一切都结束了。到目前为止,一切都很好;我在单线程代码上得到了很好的加速。

然而,主线程有数百万的东西要发布。它只是不断地发布它们,比工作线程处理它们的速度快得多。我没有达到RAM限制,但排队这么多东西还是有点傻。我想做的是为处理程序队列设置一个固定的大小,如果队列已满,则设置post()块。

我在Boost ASIO文档中没有看到任何选项。这可能吗?

我正在使用信号量来修复处理程序队列大小。以下代码说明了此解决方案:

void Schedule(boost::function<void()> function)
{
    semaphore.wait();
    io_service.post(boost::bind(&TaskWrapper, function));
}
void TaskWrapper(boost::function<void()> &function)
{
    function();
    semaphore.post();
}

您可以将您的lambda封装在另一个lambda中,该lambda将负责计算"正在进行"的任务,然后在有太多正在进行的任务时等待发布。

示例:

#include <atomic>
#include <chrono>
#include <future>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <boost/asio.hpp>
class ThreadPool {
  using asio_worker = std::unique_ptr<boost::asio::io_service::work>;
  boost::asio::io_service service;
  asio_worker service_worker;
  std::vector<std::thread> grp;
  std::atomic<int> inProgress = 0;
  std::mutex mtx;
  std::condition_variable busy;
public:
  ThreadPool(int threads) : service(), service_worker(new asio_worker::element_type(service)) {
    for (int i = 0; i < threads; ++i) {
      grp.emplace_back([this] { service.run(); });
    }
  }
  template<typename F>
  void enqueue(F && f) {
    std::unique_lock<std::mutex> lock(mtx);
    // limit queue depth = number of threads
    while (inProgress >= grp.size()) {
      busy.wait(lock);
    }
    inProgress++;
    service.post([this, f = std::forward<F>(f)]{
      try {
        f();
      }
      catch (...) {
        inProgress--;
        busy.notify_one();
        throw;
      }
      inProgress--;
      busy.notify_one();
    });
  }
  ~ThreadPool() {
    service_worker.reset();
    for (auto& t : grp)
      if (t.joinable())
        t.join();
    service.stop();
  }
};
int main() {
  std::unique_ptr<ThreadPool> pool(new ThreadPool(4));
  for (int i = 1; i <= 20; ++i) {
    pool->enqueue([i] {
      std::string s("Hello from task ");
      s += std::to_string(i) + "n";
      std::cout << s;
      std::this_thread::sleep_for(std::chrono::seconds(1));
    });
  }
  std::cout << "All tasks queued.n";
  pool.reset(); // wait for all tasks to complete
  std::cout << "Done.n";
}

输出:

Hello from task 3
Hello from task 4
Hello from task 2
Hello from task 1
Hello from task 5
Hello from task 7
Hello from task 6
Hello from task 8
Hello from task 9
Hello from task 10
Hello from task 11
Hello from task 12
Hello from task 13
Hello from task 14
Hello from task 15
Hello from task 16
Hello from task 17
Hello from task 18
All tasks queued.
Hello from task 19
Hello from task 20
Done.

您可以使用strand对象来放置事件并在main中放置延迟吗?你的程序是不是在所有工作发布后就退出了?如果是这样,您可以使用工作对象,这将使您能够更好地控制io_service何时停止。

你可以总是主检查线程的状态,让它等待,直到一个线程空闲或类似的事情。

//链接

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service__strand.html

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service.html

//example from the second link
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);

希望这能有所帮助。

也许可以尝试降低主线程的优先级,这样一旦工作线程繁忙,它们就会耗尽主线程和系统自身的限制。