使用 boost::asio 时是否需要实现阻塞?
Do I need to implement blocking when using boost::asio?
我的问题是,如果我在多个线程上运行io_service::run(),我是否需要在这些异步函数上实现阻塞?
例:
int i = 0;
int j = 0;
void test_timer(boost::system::error_code ec)
{
//I need to lock up here ?
if (i++ == 10)
{
j = i * 10;
}
timer.expires_at(timer.expires_at() + boost::posix_time::milliseconds(500));
timer.async_wait(&test_timer);
}
void threadMain()
{
io_service.run();
}
int main()
{
boost::thread_group workers;
timer.async_wait(&test_timer);
for (int i = 0; i < 5; i++){
workers.create_thread(&threadMain);
}
io_service.run();
workers.join_all();
return 0;
}
异步的定义是它是非阻塞的。
如果您的意思是问"我是否必须同步对来自不同线程的共享对象的访问" - 这个问题无关紧要,答案取决于您共享对象的线程安全记录。
对于 Asio,基本上(粗略摘要)您需要将并发访问(并发如:来自多个线程)同步到除boost::asio::io_context
¹,² 之外的所有类型。
您的样品
此示例使用运行 io 服务的多个线程,这意味着处理程序在任意线程上运行。这意味着您实际上是在共享全局变量,并且它们确实需要保护。
但是 ,由于应用程序逻辑(异步调用链)指示只有一个操作处于挂起状态,并且共享计时器对象上的下一个异步操作始终从该链中调度,因此在逻辑上,访问全部来自单个线程(称为隐式链)。请参阅为什么使用 boost::asio 时每个连接需要链?
最简单的工作方法:
逻辑链
住在科里鲁
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>
boost::asio::io_service io_service;
boost::asio::deadline_timer timer { io_service };
struct state_t {
int i = 0;
int j = 0;
} state;
void test_timer(boost::system::error_code ec)
{
if (ec != boost::asio::error::operation_aborted) {
{
if (state.i++ == 10) {
state.j = state.i * 10;
if (state.j > 100)
return; // stop after 5 seconds
}
}
timer.expires_at(timer.expires_at() + boost::posix_time::milliseconds(50));
timer.async_wait(&test_timer);
}
}
int main()
{
boost::thread_group workers;
timer.expires_from_now(boost::posix_time::milliseconds(50));
timer.async_wait(&test_timer);
for (int i = 0; i < 5; i++){
workers.create_thread([] { io_service.run(); });
}
workers.join_all();
std::cout << "i = " << state.i << std::endl;
std::cout << "j = " << state.j << std::endl;
}
请注意,我从主线程中删除了
io_service::run()
,因为它与join()
冗余(除非您真的想要6个线程运行处理程序,而不是 5 个线程)。
指纹
i = 11
j = 110
警告
这里潜伏着一个陷阱。比如说,你不想像我一样以固定的数字保释,但想停下来,你会很想这样做:
timer.cancel();
从main
.这是不合法的,因为deadline_timer
对象不是线程安全的。您需要
- 使用全局
atomic_bool
发出终止请求的信号 - 将
timer.cancel()
发布在与计时器异步链相同的链上。但是,只有一个显式链,因此如果不将代码更改为使用显式链,则无法执行此操作。
更多定时器
让我们通过两个计时器来使事情复杂化,它们有自己的隐式链。这意味着对计时器实例的访问仍然不需要同步,但对i
和j
的访问确实需要同步。
注意在此演示中,我使用
synchronized_value<>
表示优雅。您可以使用mutex
和lock_guard
手动编写类似的逻辑。
住在科里鲁
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <iostream>
boost::asio::io_service io_service;
struct state {
int i = 0;
int j = 0;
};
boost::synchronized_value<state> shared_state;
struct TimerChain {
boost::asio::deadline_timer _timer;
TimerChain() : _timer{io_service} {
_timer.expires_from_now(boost::posix_time::milliseconds(50));
resume();
}
void resume() {
_timer.async_wait(boost::bind(&TimerChain::test_timer, this, _1));
};
void test_timer(boost::system::error_code ec)
{
if (ec != boost::asio::error::operation_aborted) {
{
auto state = shared_state.synchronize();
if (state->i++ == 10) {
state->j = state->i * 10;
}
if (state->j > 100) return; // stop after some iterations
}
_timer.expires_at(_timer.expires_at() + boost::posix_time::milliseconds(50));
resume();
}
}
};
int main()
{
boost::thread_group workers;
TimerChain timer1;
TimerChain timer2;
for (int i = 0; i < 5; i++){
workers.create_thread([] { io_service.run(); });
}
workers.join_all();
auto state = shared_state.synchronize();
std::cout << "i = " << state->i << std::endl;
std::cout << "j = " << state->j << std::endl;
}
指纹
i = 12
j = 110
添加显式链
现在添加它们非常简单:
struct TimerChain {
boost::asio::io_service::strand _strand;
boost::asio::deadline_timer _timer;
TimerChain() : _strand{io_service}, _timer{io_service} {
_timer.expires_from_now(boost::posix_time::milliseconds(50));
resume();
}
void resume() {
_timer.async_wait(_strand.wrap(boost::bind(&TimerChain::test_timer, this, _1)));
};
void stop() { // thread safe
_strand.post([this] { _timer.cancel(); });
}
// ...
住在科里鲁
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <iostream>
boost::asio::io_service io_service;
struct state {
int i = 0;
int j = 0;
};
boost::synchronized_value<state> shared_state;
struct TimerChain {
boost::asio::io_service::strand _strand;
boost::asio::deadline_timer _timer;
TimerChain() : _strand{io_service}, _timer{io_service} {
_timer.expires_from_now(boost::posix_time::milliseconds(50));
resume();
}
void resume() {
_timer.async_wait(_strand.wrap(boost::bind(&TimerChain::test_timer, this, _1)));
};
void stop() { // thread safe
_strand.post([this] { _timer.cancel(); });
}
void test_timer(boost::system::error_code ec)
{
if (ec != boost::asio::error::operation_aborted) {
{
auto state = shared_state.synchronize();
if (state->i++ == 10) {
state->j = state->i * 10;
}
}
// continue indefinitely
_timer.expires_at(_timer.expires_at() + boost::posix_time::milliseconds(50));
resume();
}
}
};
int main()
{
boost::thread_group workers;
TimerChain timer1;
TimerChain timer2;
for (int i = 0; i < 5; i++){
workers.create_thread([] { io_service.run(); });
}
boost::this_thread::sleep_for(boost::chrono::seconds(10));
timer1.stop();
timer2.stop();
workers.join_all();
auto state = shared_state.synchronize();
std::cout << "i = " << state->i << std::endl;
std::cout << "j = " << state->j << std::endl;
}
指纹
i = 400
j = 110
¹(或使用旧名称boost::asio::io_service
)
² 生存期突变在这方面不被视为成员操作(即使对于线程安全对象,您也必须手动同步共享对象的构造/销毁)
- 这个极客对极客的trie实现是否存在内存泄漏问题
- C++方法实现:是否可以避免每次都键入类名?
- 我的堆栈弹出式磁带的实现是否泄漏内存?
- 这C++ AtomicInt 实现是否正确?
- STR这个实现是否安全且可移植?
- 此包络实现是否正确使用 C++11 原子学
- 使用 PIMPL 惯用法,实现是否应始终是类的私有成员?
- 不同平台的 fstream 系列实现是否不同?
- 虚拟 CTOR 的克隆函数实现是否有问题
- fetch_mult的原子实现是否正确?
- 像这样的PIMPL实现是否有任何简单的语法
- 双重检查单例线程的实现是否安全?
- 此并发快速排序实现是否正确?
- C/C 中POW()函数的实现是否随平台或编译器而变化
- std::tuple的实现是否允许在触发空类元素的派生到基转换时失败
- 我的双重检查锁定模式实现是否正确?
- 我的链表实现是否泄漏内存
- std::string 实现是否符合其中 's.c_str() + s.size()' 不一定与 '&s[s.size()]' 相同?
- std::vector 实现是否使用内部数组或链表或其他
- 此访问者实现是否正确