如何等待 asio 处理程序

How to wait for an asio handler?

本文关键字:asio 处理 程序 等待 何等待      更新时间:2023-10-16

我有一个围绕具有一些属性的boost::asio::io_service运行的对象。像这样:

class Foo
{
  private:
    // Not an int in my real code, but it doesn't really matter.
    int m_bar;
    boost::asio::io_service& m_io_service;
    boost::asio::strand m_bar_strand;
};

m_bar只能从通过链m_bar_strand调用的处理程序中使用。这使我不会从这些处理程序中锁定。

为了从运行io_service::run()线程外部设置 m_bar 属性,我编写了一个asynchronous_setter,如下所示:

class Foo
{
  public:
    void async_get_bar(function<void (int)> handler)
    {
      m_bar_strand.post(bind(&Foo::do_get_bar, this, handler));
    }
    void async_set_bar(int value, function<void ()> handler)
    {
      m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler));
    }
  private:
    void do_get_bar(function<void (int)> handler)
    {
      // This is only called from within the m_bar_strand, so we are safe.
      // Run the handler to notify the caller.
      handler(m_bar);
    }
    void do_set_bar(int value, function<void ()> handler)
    {
      // This is only called from within the m_bar_strand, so we are safe.
      m_bar = value;
      // Run the handler to notify the caller.
      handler();
    }
    int m_bar;
    boost::asio::io_service& m_io_service;
    boost::asio::strand m_bar_strand;
};

这非常有效,但现在我想编写一个同步版本的 set_bar,该版本设置值并仅在集合有效时才返回。它仍然必须保证有效集合将在m_bar_strand内发生。理想情况下,是可重入的东西。

我可以想象带有信号量的解决方案,这些解决方案将从处理程序中修改,但我提出的一切似乎都很笨拙,而且真的不优雅。Boost/Boost Asio中是否有允许这样的事情?

您将如何继续实施此方法?

如果您需要同步等待设置值,那么 Boost.Thread 的futures可能会提供一个优雅的解决方案:

期货库提供了一种处理同步未来值的方法,无论这些值是由另一个线程生成的,还是在响应外部刺激的单个线程上生成的,还是按需生成的。

简而言之,将创建一个boost::promise并允许在其上设置值。 稍后可以通过关联的boost::future检索该值。 下面是一个基本示例:

boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();
// start asynchronous operation that will invoke future.set_value(42)
...
assert(future.get() == 42); // blocks until future has been set.

这种方法的另外两个显著优点:

  • future是C++11的一部分。
  • 异常甚至可以通过promise::set_exception()传递给future,支持向调用者提供异常或错误的优雅方式。

下面是一个基于原始代码的完整示例:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
class Foo
{
public:
  Foo(boost::asio::io_service& io_service)
    : m_io_service(io_service),
      m_bar_strand(io_service)
  {}
public:
  void async_get_bar(boost::function<void(int)> handler)
  {
    m_bar_strand.post(bind(&Foo::do_get_bar, this, handler));
  }
  void async_set_bar(int value, boost::function<void()> handler)
  {
    m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler));
  }
  int bar()
  {
    typedef boost::promise<int> promise_type;
    promise_type promise;
    // Pass the handler to async operation that will set the promise.
    void (promise_type::*setter)(const int&) = &promise_type::set_value;
    async_get_bar(boost::bind(setter, &promise, _1));
    // Synchronously wait for promise to be fulfilled.
    return promise.get_future().get();
  }
  void bar(int value)
  {
    typedef boost::promise<void> promise_type;
    promise_type promise;
    // Pass the handler to async operation that will set the promise.
    async_set_bar(value, boost::bind(&promise_type::set_value, &promise));
    // Synchronously wait for the future to finish.
    promise.get_future().wait();
  }
private:
  void do_get_bar(boost::function<void(int)> handler)
  {
    // This is only called from within the m_bar_strand, so we are safe.
    // Run the handler to notify the caller.
    handler(m_bar);
  }
  void do_set_bar(int value, boost::function<void()> handler)
  {
    // This is only called from within the m_bar_strand, so we are safe.
    m_bar = value;
    // Run the handler to notify the caller.
    handler();
  }
  int m_bar;
  boost::asio::io_service& m_io_service;
  boost::asio::strand m_bar_strand;
};
int main()
{
  boost::asio::io_service io_service;
  boost::asio::io_service::work work(io_service);
  boost::thread t(
      boost::bind(&boost::asio::io_service::run, boost::ref(io_service)));
  Foo foo(io_service);
  foo.bar(21);
  std::cout << "foo.bar is " << foo.bar() << std::endl;
  foo.bar(2 * foo.bar());
  std::cout << "foo.bar is " << foo.bar() << std::endl;
  io_service.stop();
  t.join();
}

它提供以下输出:

foo.bar is 21
foo.bar is 42
可以使用

管道在 async_set_bar() 中设置值时通知同步方法。警告,下面的代码是大脑编译的,可能有错误,但它应该明白这一点

#include <boost/asio.hpp>
#include <iostream>
#include <thread>                                                                                                                       
class Foo                                                                                                                               
{
public:                                                                                                                                 
    Foo( boost::asio::io_service& io_service ) :                                                                                        
        _bar( 0 ),
        _io_service( io_service ),                                                                                                      
        _strand( _io_service ),                                                                                                         
        _readPipe( _io_service ),
        _writePipe( _io_service )
    {
        boost::asio::local::connect_pair( _readPipe, _writePipe );
    }
    void set_async( int v ) {
        _strand.post( [=]
            {
                _bar = v;
                std::cout << "sending " << _bar << std::endl; 
                _writePipe.send( boost::asio::buffer( &_bar, sizeof(_bar) ) );
            }
            );
    }
    void set_sync( int v ) {
        this->set_async( v );
        int value;
        _readPipe.receive( boost::asio::buffer(&value, sizeof(value) ) );
        std::cout << "set value to " << value << std::endl;
    }

private:
    int _bar;
    boost::asio::io_service& _io_service;
    boost::asio::io_service::strand _strand;
    boost::asio::local::stream_protocol::socket _readPipe;
    boost::asio::local::stream_protocol::socket _writePipe;
};
int
main()
{
    boost::asio::io_service io_service;
    boost::asio::io_service::work w(io_service);
    std::thread t( [&]{ io_service.run(); } );
    Foo f( io_service );
    f.set_sync( 20 );
    io_service.stop();
    t.join();
}

如果您无法使用 C++11 lambda,请将其替换为 boost::bind 和更多完成处理程序方法。

这是我想到的:

class synchronizer_base
{
    protected:
        synchronizer_base() :
            m_has_result(false),
            m_lock(m_mutex)
        {
        }
        void wait()
        {
            while (!m_has_result)
            {
                m_condition.wait(m_lock);
            }
        }
        void notify_result()
        {
            m_has_result = true;
            m_condition.notify_all();
        }
    private:
        boost::atomic<bool> m_has_result;
        boost::mutex m_mutex;
        boost::unique_lock<boost::mutex> m_lock;
        boost::condition_variable m_condition;
};
template <typename ResultType = void>
class synchronizer : public synchronizer_base
{
    public:
        void operator()(const ResultType& result)
        {
            m_result = result;
            notify_result();
        }
        ResultType wait_result()
        {
            wait();
            return m_result;
        }
    private:
        ResultType m_result;
};
template <>
class synchronizer<void> : public synchronizer_base
{
    public:
        void operator()()
        {
            notify_result();
        }
        void wait_result()
        {
            wait();
        }
};

我可以这样使用它:

class Foo
{
  public:
    void async_get_bar(function<void (int)> handler)
    {
      m_bar_strand.post(bind(&Foo::do_get_bar, this, value, handler));
    }
    void async_set_bar(int value, function<void ()> handler)
    {
      m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler));
    }
    int get_bar()
    {
      synchronizer<int> sync;
      async_get_bar(boost::ref(sync));
      return sync.wait_result();
    }
    void set_bar(int value)
    {
      synchronizer<void> sync;
      async_set_bar(value, boost::ref(sync));
      sync.wait_result();
    }
};

boost::ref是必需的,因为synchronizer的实例是不可复制的。这可以通过将synchronizer包装在其他容器类中来避免,但我对该解决方案很好。

注意:不要从处理程序内部调用此类"同步"函数,否则它可能只是死锁!