如何等待 asio 处理程序
How to wait for an asio handler?
我有一个围绕具有一些属性的boost::asio::io_service
运行的对象。像这样:
class Foo
{
private:
// Not an int in my real code, but it doesn't really matter.
int m_bar;
boost::asio::io_service& m_io_service;
boost::asio::strand m_bar_strand;
};
m_bar
只能从通过链m_bar_strand
调用的处理程序中使用。这使我不会从这些处理程序中锁定。
为了从运行io_service::run()
线程外部设置 m_bar
属性,我编写了一个asynchronous_setter,如下所示:
class Foo
{
public:
void async_get_bar(function<void (int)> handler)
{
m_bar_strand.post(bind(&Foo::do_get_bar, this, handler));
}
void async_set_bar(int value, function<void ()> handler)
{
m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler));
}
private:
void do_get_bar(function<void (int)> handler)
{
// This is only called from within the m_bar_strand, so we are safe.
// Run the handler to notify the caller.
handler(m_bar);
}
void do_set_bar(int value, function<void ()> handler)
{
// This is only called from within the m_bar_strand, so we are safe.
m_bar = value;
// Run the handler to notify the caller.
handler();
}
int m_bar;
boost::asio::io_service& m_io_service;
boost::asio::strand m_bar_strand;
};
这非常有效,但现在我想编写一个同步版本的 set_bar
,该版本设置值并仅在集合有效时才返回。它仍然必须保证有效集合将在m_bar_strand
内发生。理想情况下,是可重入的东西。
我可以想象带有信号量的解决方案,这些解决方案将从处理程序中修改,但我提出的一切似乎都很笨拙,而且真的不优雅。Boost/Boost Asio中是否有允许这样的事情?
您将如何继续实施此方法?
如果您需要同步等待设置值,那么 Boost.Thread 的futures
可能会提供一个优雅的解决方案:
期货库提供了一种处理同步未来值的方法,无论这些值是由另一个线程生成的,还是在响应外部刺激的单个线程上生成的,还是按需生成的。
简而言之,将创建一个boost::promise
并允许在其上设置值。 稍后可以通过关联的boost::future
检索该值。 下面是一个基本示例:
boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();
// start asynchronous operation that will invoke future.set_value(42)
...
assert(future.get() == 42); // blocks until future has been set.
这种方法的另外两个显著优点:
-
future
是C++11的一部分。 - 异常甚至可以通过
promise::set_exception()
传递给future
,支持向调用者提供异常或错误的优雅方式。
下面是一个基于原始代码的完整示例:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
class Foo
{
public:
Foo(boost::asio::io_service& io_service)
: m_io_service(io_service),
m_bar_strand(io_service)
{}
public:
void async_get_bar(boost::function<void(int)> handler)
{
m_bar_strand.post(bind(&Foo::do_get_bar, this, handler));
}
void async_set_bar(int value, boost::function<void()> handler)
{
m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler));
}
int bar()
{
typedef boost::promise<int> promise_type;
promise_type promise;
// Pass the handler to async operation that will set the promise.
void (promise_type::*setter)(const int&) = &promise_type::set_value;
async_get_bar(boost::bind(setter, &promise, _1));
// Synchronously wait for promise to be fulfilled.
return promise.get_future().get();
}
void bar(int value)
{
typedef boost::promise<void> promise_type;
promise_type promise;
// Pass the handler to async operation that will set the promise.
async_set_bar(value, boost::bind(&promise_type::set_value, &promise));
// Synchronously wait for the future to finish.
promise.get_future().wait();
}
private:
void do_get_bar(boost::function<void(int)> handler)
{
// This is only called from within the m_bar_strand, so we are safe.
// Run the handler to notify the caller.
handler(m_bar);
}
void do_set_bar(int value, boost::function<void()> handler)
{
// This is only called from within the m_bar_strand, so we are safe.
m_bar = value;
// Run the handler to notify the caller.
handler();
}
int m_bar;
boost::asio::io_service& m_io_service;
boost::asio::strand m_bar_strand;
};
int main()
{
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);
boost::thread t(
boost::bind(&boost::asio::io_service::run, boost::ref(io_service)));
Foo foo(io_service);
foo.bar(21);
std::cout << "foo.bar is " << foo.bar() << std::endl;
foo.bar(2 * foo.bar());
std::cout << "foo.bar is " << foo.bar() << std::endl;
io_service.stop();
t.join();
}
它提供以下输出:
foo.bar is 21
foo.bar is 42
管道在 async_set_bar()
中设置值时通知同步方法。警告,下面的代码是大脑编译的,可能有错误,但它应该明白这一点
#include <boost/asio.hpp>
#include <iostream>
#include <thread>
class Foo
{
public:
Foo( boost::asio::io_service& io_service ) :
_bar( 0 ),
_io_service( io_service ),
_strand( _io_service ),
_readPipe( _io_service ),
_writePipe( _io_service )
{
boost::asio::local::connect_pair( _readPipe, _writePipe );
}
void set_async( int v ) {
_strand.post( [=]
{
_bar = v;
std::cout << "sending " << _bar << std::endl;
_writePipe.send( boost::asio::buffer( &_bar, sizeof(_bar) ) );
}
);
}
void set_sync( int v ) {
this->set_async( v );
int value;
_readPipe.receive( boost::asio::buffer(&value, sizeof(value) ) );
std::cout << "set value to " << value << std::endl;
}
private:
int _bar;
boost::asio::io_service& _io_service;
boost::asio::io_service::strand _strand;
boost::asio::local::stream_protocol::socket _readPipe;
boost::asio::local::stream_protocol::socket _writePipe;
};
int
main()
{
boost::asio::io_service io_service;
boost::asio::io_service::work w(io_service);
std::thread t( [&]{ io_service.run(); } );
Foo f( io_service );
f.set_sync( 20 );
io_service.stop();
t.join();
}
如果您无法使用 C++11 lambda,请将其替换为 boost::bind
和更多完成处理程序方法。
这是我想到的:
class synchronizer_base
{
protected:
synchronizer_base() :
m_has_result(false),
m_lock(m_mutex)
{
}
void wait()
{
while (!m_has_result)
{
m_condition.wait(m_lock);
}
}
void notify_result()
{
m_has_result = true;
m_condition.notify_all();
}
private:
boost::atomic<bool> m_has_result;
boost::mutex m_mutex;
boost::unique_lock<boost::mutex> m_lock;
boost::condition_variable m_condition;
};
template <typename ResultType = void>
class synchronizer : public synchronizer_base
{
public:
void operator()(const ResultType& result)
{
m_result = result;
notify_result();
}
ResultType wait_result()
{
wait();
return m_result;
}
private:
ResultType m_result;
};
template <>
class synchronizer<void> : public synchronizer_base
{
public:
void operator()()
{
notify_result();
}
void wait_result()
{
wait();
}
};
我可以这样使用它:
class Foo
{
public:
void async_get_bar(function<void (int)> handler)
{
m_bar_strand.post(bind(&Foo::do_get_bar, this, value, handler));
}
void async_set_bar(int value, function<void ()> handler)
{
m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler));
}
int get_bar()
{
synchronizer<int> sync;
async_get_bar(boost::ref(sync));
return sync.wait_result();
}
void set_bar(int value)
{
synchronizer<void> sync;
async_set_bar(value, boost::ref(sync));
sync.wait_result();
}
};
boost::ref
是必需的,因为synchronizer
的实例是不可复制的。这可以通过将synchronizer
包装在其他容器类中来避免,但我对该解决方案很好。
注意:不要从处理程序内部调用此类"同步"函数,否则它可能只是死锁!
- 提升 Asio TCP 服务器 处理多个客户端
- 将更高的优先级设置为 boost::asio 线程处理进程
- 某些 boost::asio 异步函数是否将处理程序连接到操作,以便处理程序被触发一次?
- boost::asio 中的自定义处理程序
- boost::asio 不会触发读取处理程序,而 Wireshark 会看到传入的数据
- asio::io_service 具有多个线程的优先级队列处理
- 异步操作的 Asio 处理程序在其同步对应项正常工作时不会调用
- boost::asio 使用 post() 时没有调用处理程序,当直接调用函数时有效(io_context有工作)
- 如何将每线程用户数据传递到ASIO处理程序中
- boost::asio 允许非阻塞接受新连接,而连接的处理程序正在阻塞
- 提升::不满足 ASIO 读取处理程序类型要求
- 独立 asio async_connect不触发绑定处理程序
- 在 Boost ASIO 服务器中处理生命周期
- boost::async_connect 上的 ASIO 完成处理程序在第一次失败后再也没有调用过
- boost::asio::async_read 不会回调我的处理程序函数
- 为什么Boost.Asio处理程序必须是可复制的
- 使用 Boost ASIO 处理没有套接字的 TCP
- 如何等待 asio 处理程序
- 从asio处理程序生成新的异步请求
- 如何在async boost::asio处理程序中获取接收消息的端点