对于boost io_service,只有一个线程阻塞在epoll_wait上
For boost io_service, is only-one thread blocked on epoll_wait?
我读了Boost ASIO的源代码,我想知道它只有一个线程调用epoll_wait(当然,如果我使用epoll反应器)。
我想找到关于多个线程调用epoll_wait的解决方案,这可能会导致不同的线程同时对同一个套接字进行读取。我读了一些关键代码如下:
// Prepare to execute first handler from queue.
operation* o = op_queue_.front();
op_queue_.pop();
bool more_handlers = (!op_queue_.empty());
if (o == &task_operation_)
{
task_interrupted_ = more_handlers;
if (more_handlers && !one_thread_)
wakeup_event_.unlock_and_signal_one(lock);
else
lock.unlock();
task_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Run the task. May throw an exception. Only block if the operation
// queue is empty and we're not polling, otherwise we want to return
// as soon as possible.
task_->run(!more_handlers, this_thread.private_op_queue);
}
task_是epoll反应器,它将在运行时调用epoll_wait,我猜它可能只有一个线程调用它,因为在op_queue_中只有一个"task_operation_",我是对的吗?
如果我想在多线程中使用epoll,或者我可以使用"epollonshot",这样它可以确保一个线程一次处理一个套接字。
- 第一种情况,是当您使用
io_service
的单个实例并从多个线程调用io_service::run
方法时。
让我们看看schduler::run
函数(简化):
std::size_t scheduler::run(asio::error_code& ec)
{
mutex::scoped_lock lock(mutex_);
std::size_t n = 0;
for (; do_run_one(lock, this_thread, ec); lock.lock())
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
return n;
}
因此,持有锁后,它调用do_run_one
方法,类似于:
std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
scheduler::thread_info& this_thread,
const asio::error_code& ec)
{
while (!stopped_)
{
if (!op_queue_.empty())
{
// Prepare to execute first handler from queue.
operation* o = op_queue_.front();
op_queue_.pop();
bool more_handlers = (!op_queue_.empty());
if (o == &task_operation_)
{
task_interrupted_ = more_handlers;
if (more_handlers && !one_thread_)
wakeup_event_.unlock_and_signal_one(lock);
else
lock.unlock();
task_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
task_->run(!more_handlers, this_thread.private_op_queue);
}
else
{
//......
}
}
else
{
wakeup_event_.clear(lock);
wakeup_event_.wait(lock);
}
}
return 0;
}
代码中有趣的部分是这些行:
if (more_handlers && !one_thread_)
wakeup_event_.unlock_and_signal_one(lock);
else
lock.unlock();
我们现在讨论的情况是具有多个线程的情况,因此第一个条件将满足(假设我们在op_queue_中有相当多的待挂任务)。
wakeup_event_.unlock_and_signal_one
最终做的是释放/解锁lock
,并通知正在等待条件等待的线程之一。因此,有了这个,至少另一个线程(无论谁获得了锁)现在可以调用do_run_one
。
在你的情况下,task_
是epoll_reactor
,正如你所说的。并且,在它的run
方法中,它调用epoll_wait
(不保留scheduler
的lock_
)。
这里有趣的是它在迭代epoll_wait
返回的所有ready描述符时所做的事情。它将它们推回到作为参数引用接收的操作队列中。现在推送的操作运行时类型为descriptor_state
,而不是task_operation_
:
for (int i = 0; i < num_events; ++i)
{
void* ptr = events[i].data.ptr;
if (ptr == &interrupter_)
{
// don't call work_started() here. This still allows the scheduler to
// stop if the only remaining operations are descriptor operations.
descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
descriptor_data->set_ready_events(events[i].events);
ops.push(descriptor_data);
}
}
因此,在scheduler::do_run_one
内部的while循环的下一次迭代中,对于完成的任务,它将到达else
分支(我在之前的粘贴中省略了它):
else
{
std::size_t task_result = o->task_result_;
if (more_handlers && !one_thread_)
wake_one_thread_and_unlock(lock);
else
lock.unlock();
// Ensure the count of outstanding work is decremented on block exit.
work_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Complete the operation. May throw an exception. Deletes the object.
o->complete(this, ec, task_result);
return 1;
}
调用complete
函数指针,该指针可能会调用用户传递给async_read
或async_write
API的句柄。
- 第二种情况,是你创建一个
io_service
对象池,并在一个或多个线程上调用它的run
方法,即io_service
和thread
之间的映射可以是1:1或1:1:N,这可能适合你的应用程序。这样你就可以用轮询的方式将io_service
对象分配给soucket
对象。
现在,回到你的问题:
如果我想在多线程中使用epoll,或者我可以使用" epollonshot "因此,它可以确保一个线程在同一时间处理一个套接字。
如果我理解正确的话,你想用一个线程处理一个套接字的所有事件吗?我认为这是可能的通过以下方法2,即创建一个io_service
对象池并将其映射到1个线程。通过这种方式,您可以确保特定套接字上的所有活动仅由一个线程寻址,即io_service:run
所在的线程。
在上述情况下,您不必担心设置EPOLLONESHOT
。
我不太确定是否使用第一种方法获得相同的行为,这是多线程和1 io_service
。
但是,如果你根本不使用线程,即你的io_service
在单线程上运行,那么你不必担心所有这些,毕竟asio的目的是抽象所有这些东西
只有一个线程会调用epoll_wait
。一旦线程接收到描述符的事件通知,它将把描述符解复用到运行io_service
的所有线程。根据平台特定实现说明:
线程:
- 使用
epoll
的解复用在调用io_service::run()
,io_service::run_one()
,io_service::poll()
或io_service::poll_one()
的线程中执行。
单个描述符将由执行I/O的单个线程处理。因此,当使用异步操作时,I/O不会对给定的套接字并发执行。
- std::condition_variable::wait()如何评估给定的谓词
- std::atomic和std::condition_variable wait,notify_*方法之间的区别
- std::memory_order for std::atomic:<T>:wait
- 从 Boost ASIO 获取 epoll 描述符 io_service对象
- std::p romise::set_value() 和 std::future::wait() 是否提供内存围栏?
- 对于等待以 std::future wait() 返回的函数的 CPU 使用率或检查标志在循环中休眠一段时间哪个更好?
- 我应该如何使用 epoll 从同一个 FD 读取和写入
- std::future::get()或std::future::wait()是std::thread::join()的替
- 使用 epoll 边缘触发器的套接字上的数据过多
- Epoll zero recv() and negative(EAGAIN) send()
- 在 while 循环中使用 std::condition_variable::wait 是否正确
- 如何使用 epoll(void* event.data.ptr) 管理 Connection 的生命周期
- future::wait() 是否与 async() 执行线程的完成同步?
- 为什么'wait with predicate'求解条件变量的'lost wakeup'?
- 线程锁定互斥锁的速度比 std::conditional_variable::wait() 快
- deadline_timer::wait 是否让位于其他任务
- std::future::wait 是内存障碍吗?(我无法解释这种数据竞赛)
- 我可以将一个套接字添加到多个 epoll 实例吗?
- 如何退出 QThread::wait()
- 如何修复OSX中的"fatal error: 'sys/epoll.h' file not found"?