使用Boost Asio设置发布队列大小的限制
Setting limit on post queue size with Boost Asio?
我使用boost::asio::io_service
作为基本线程池。一些线程被添加到io_service中,主线程开始发布处理程序,辅助线程开始运行处理程序,一切都结束了。到目前为止,一切都很好;我在单线程代码上得到了很好的加速。
然而,主线程有数百万的东西要发布。它只是不断地发布它们,比工作线程处理它们的速度快得多。我没有达到RAM限制,但排队这么多东西还是有点傻。我想做的是为处理程序队列设置一个固定的大小,如果队列已满,则设置post()块。
我在Boost ASIO文档中没有看到任何选项。这可能吗?
我正在使用信号量来修复处理程序队列大小。以下代码说明了此解决方案:
void Schedule(boost::function<void()> function)
{
semaphore.wait();
io_service.post(boost::bind(&TaskWrapper, function));
}
void TaskWrapper(boost::function<void()> &function)
{
function();
semaphore.post();
}
您可以将您的lambda封装在另一个lambda中,该lambda将负责计算"正在进行"的任务,然后在有太多正在进行的任务时等待发布。
示例:
#include <atomic>
#include <chrono>
#include <future>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <boost/asio.hpp>
class ThreadPool {
using asio_worker = std::unique_ptr<boost::asio::io_service::work>;
boost::asio::io_service service;
asio_worker service_worker;
std::vector<std::thread> grp;
std::atomic<int> inProgress = 0;
std::mutex mtx;
std::condition_variable busy;
public:
ThreadPool(int threads) : service(), service_worker(new asio_worker::element_type(service)) {
for (int i = 0; i < threads; ++i) {
grp.emplace_back([this] { service.run(); });
}
}
template<typename F>
void enqueue(F && f) {
std::unique_lock<std::mutex> lock(mtx);
// limit queue depth = number of threads
while (inProgress >= grp.size()) {
busy.wait(lock);
}
inProgress++;
service.post([this, f = std::forward<F>(f)]{
try {
f();
}
catch (...) {
inProgress--;
busy.notify_one();
throw;
}
inProgress--;
busy.notify_one();
});
}
~ThreadPool() {
service_worker.reset();
for (auto& t : grp)
if (t.joinable())
t.join();
service.stop();
}
};
int main() {
std::unique_ptr<ThreadPool> pool(new ThreadPool(4));
for (int i = 1; i <= 20; ++i) {
pool->enqueue([i] {
std::string s("Hello from task ");
s += std::to_string(i) + "n";
std::cout << s;
std::this_thread::sleep_for(std::chrono::seconds(1));
});
}
std::cout << "All tasks queued.n";
pool.reset(); // wait for all tasks to complete
std::cout << "Done.n";
}
输出:
Hello from task 3
Hello from task 4
Hello from task 2
Hello from task 1
Hello from task 5
Hello from task 7
Hello from task 6
Hello from task 8
Hello from task 9
Hello from task 10
Hello from task 11
Hello from task 12
Hello from task 13
Hello from task 14
Hello from task 15
Hello from task 16
Hello from task 17
Hello from task 18
All tasks queued.
Hello from task 19
Hello from task 20
Done.
您可以使用strand对象来放置事件并在main中放置延迟吗?你的程序是不是在所有工作发布后就退出了?如果是这样,您可以使用工作对象,这将使您能够更好地控制io_service何时停止。
你可以总是主检查线程的状态,让它等待,直到一个线程空闲或类似的事情。
//链接
http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service__strand.html
http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service.html
//example from the second link
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);
希望这能有所帮助。
也许可以尝试降低主线程的优先级,这样一旦工作线程繁忙,它们就会耗尽主线程和系统自身的限制。
- 理解boost::asio-async_read在无需读取内容时的行为
- boost::进程间消息队列引发错误
- 如果我只是不访问queue_front节点的子节点,而是将它们推到队列中呢?还是BFS吗
- 提升 ASIO 无法识别计时器对象
- Android NDK传感器向事件队列报告奇怪的间隔
- C++优先级队列,按对象的唯一指针的特定方法升序排列
- 按对象的特定方法按升序排列的C++优先级队列
- 使用2个键的cpp-stl::优先级队列排序不正确
- C++Boost Asio Pool线程,带有lambda函数和传递引用变量
- boost::asio::steady_timer()与sleep()我应该使用哪一个
- boost::asio如何生成多个协同程序,然后加入它们
- 我是否需要在下一次转移时将所有权*转移回转移队列
- 在一个读写器队列中,我可以用volatile替换原子吗
- asio::io_service 具有多个线程的优先级队列处理
- 可以提升 asio 帖子"overflow"队列
- 执行Boost ASIO队列的最大尺寸
- 轮询命令以在 boost asio 中的共享队列中发送
- boost::asio::io_service,具有有界和/或优先级队列
- 使用Boost Asio设置发布队列大小的限制
- 使用boost::asio和strands的事件队列:等待新事件