C++ ASIO:异步套接字和线程

C++ ASIO: Asynchronous sockets and threading

本文关键字:线程 套接字 异步 ASIO C++      更新时间:2023-10-16

我的应用程序基于 asio 聊天示例,由客户端和服务器组成:- 客户端:连接到服务器,接收请求并响应它- 服务器:具有QT GUI(主线程)和网络服务(独立线程),侦听连接,向特定客户端发送请求并解释来自GUI的响应

我想以异步方式实现这一点,以避免每个客户端连接都有单独的线程。

在我的 QT 窗口中,我有一个io_service实例和一个网络服务实例:

io_service_ = new asio::io_service();
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), "1234");
service_ = new Service(*io_service_, endpoint, this);
asio::io_service* ioServicePointer = io_service_;
t = std::thread{ [ioServicePointer](){ ioServicePointer->run(); } };

我希望能够将数据发送到一个客户端,如下所示:

service_->send_message(selectedClient.id, msg);

我通过观察者模式接收和处理响应(该窗口实现 IStreamListener 接口)

服务.cpp:

#include "Service.h"
#include "Stream.h"
void Service::runAcceptor()
{
    acceptor_.async_accept(socket_,
        [this](asio::error_code ec)
    {
        if (!ec)
        {
            std::make_shared<Stream>(std::move(socket_), &streams_)->start();
        }
        runAcceptor();
    });
}
void Service::send_message(std::string streamID, chat_message& msg)
{
    io_service_.post(
        [this, msg, streamID]()
    {
        auto stream = streams_.getStreamByID(streamID);
        stream->deliver(msg);
    }); 
}

流.cpp:

#include "Stream.h"
#include <iostream>
#include "../chat_message.h"
Stream::Stream(asio::ip::tcp::socket socket, StreamCollection* streams)
    : socket_(std::move(socket))
{
    streams_ = streams;         // keep a reference to the streamCollection
    // retrieve endpoint ip
    asio::ip::tcp::endpoint remote_ep = socket_.remote_endpoint();
    asio::ip::address remote_ad = remote_ep.address();
    this->ip_ = remote_ad.to_string();      
}
void Stream::start()
{
    streams_->join(shared_from_this());
    readHeader();
}
void Stream::deliver(const chat_message& msg)
{
    bool write_in_progress = !write_msgs_.empty();
    write_msgs_.push_back(msg);
    if (!write_in_progress)
    {
        write();
    }
}
std::string Stream::getName()
{
    return name_;
}
std::string Stream::getIP()
{
    return ip_;
}

void Stream::RegisterListener(IStreamListener *l)
{
    m_listeners.insert(l);
}
void Stream::UnregisterListener(IStreamListener *l)
{
    std::set<IStreamListener *>::const_iterator iter = m_listeners.find(l);
    if (iter != m_listeners.end())
    {
        m_listeners.erase(iter);
    }
    else {
        std::cerr << "Could not unregister the specified listener object as it is not registered." << std::endl;
    }
}
void Stream::readHeader()
{
    auto self(shared_from_this());
    asio::async_read(socket_,
        asio::buffer(read_msg_.data(), chat_message::header_length),
        [this, self](asio::error_code ec, std::size_t /*length*/)
    {
        if (!ec && read_msg_.decode_header())
        {
            readBody();
        }
        else if (ec == asio::error::eof || ec == asio::error::connection_reset)
        {
            std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamListener *l) {l->onStreamDisconnecting(this->id()); });
            streams_->die(shared_from_this());
        }
        else
        {
            std::cerr << "Exception: " << ec.message();
        }
    });
}
void Stream::readBody()
{
    auto self(shared_from_this());
    asio::async_read(socket_,
        asio::buffer(read_msg_.body(), read_msg_.body_length()),
        [this, self](asio::error_code ec, std::size_t /*length*/)
    {
        if (!ec)
        {
                    // notify the listener (GUI) that a response has arrived and pass a reference to it
            auto msg = std::make_shared<chat_message>(std::move(read_msg_));
            std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamListener *l) {l->onMessageReceived(msg); });
            readHeader();
        }
        else
        {
            streams_->die(shared_from_this());
        }
    });
}
void Stream::write()
{
    auto self(shared_from_this());
    asio::async_write(socket_,
        asio::buffer(write_msgs_.front().data(),
        write_msgs_.front().length()),
        [this, self](asio::error_code ec, std::size_t /*length*/)
    {
        if (!ec)
        {
            write_msgs_.pop_front();
            if (!write_msgs_.empty())
            {
                write();
            }
        }
        else
        {
            streams_->die(shared_from_this());
        }
    });
}

接口

 class IStream
{
public: 
    /// Unique stream identifier
    typedef void* TId;
    virtual TId id() const
    {
        return (TId)(this);
    }
    virtual ~IStream() {}
    virtual void deliver(const chat_message& msg) = 0;
    virtual std::string getName() = 0;
    virtual std::string getIP() = 0;
    /// observer pattern    
    virtual void RegisterListener(IStreamListener *l) = 0;
    virtual void UnregisterListener(IStreamListener *l) = 0;
};
 class IStreamListener
{
public:
    virtual void onStreamDisconnecting(IStream::TId streamId) = 0;
    virtual void onMessageReceived(std::shared_ptr<chat_message> msg) = 0;
};
/*
    streamCollection / service delegates
*/
class IStreamCollectionListener
{
public:
    virtual void onStreamDied(IStream::TId streamId) = 0;
    virtual void onStreamCreated(std::shared_ptr<IStream> stream) = 0;
};

StreamCollection 基本上是一组 IStreams:

 class StreamCollection
{
public:
    void join(stream_ptr stream)
    {
        streams_.insert(stream);
        std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamCollectionListener *l) {l->onStreamCreated(stream); });

    }
    // more events and observer pattern inplementation

首先:到目前为止,代码按预期工作。

我的问题:这是ASIO应该用于异步编程的方式吗?我特别不确定Service::send_message方法和io_service.post的使用.在我的情况下,它的目的是什么?当我刚刚打电话给async_write时,它确实有效,没有将其包装在 io_service.post 调用中。

我在使用这种方法时遇到问题了吗?

Asio被设计成一个tookit而不是一个框架。 因此,有多种方法可以成功使用它。 分离 GUI 和网络线程,并使用异步 I/O 实现可伸缩性可能是一个好主意。

将工作委派给公共 API 中的io_service(例如 Service::send_message())会产生以下后果:

  • 将调用方的线程与为io_service提供服务的线程分离。 例如,如果Stream::write()执行耗时的加密功能,则调用方线程 (GUI) 不会受到影响。
  • 它提供线程安全。 io_service是线程安全的;但是socket不是线程安全的。 此外,其他对象可能不是线程安全的,例如 write_msgs_ 。 Asio 保证处理程序只会从运行io_servce的线程中调用。 因此,如果只有一个线程在运行io_service,则不可能并发,并且socket_write_msgs_都将以线程安全的方式访问。 Asio将此称为隐含strand。 如果有多个线程正在处理io_service,则可能需要使用显式strand来提供线程安全性。 有关股线的更多详细信息,请参阅此答案。

其他 Asio 注意事项:

    观察器在处理程序中
  • 调用,处理程序在网络线程中运行。 如果任何观察者需要很长时间才能完成,例如必须与 GUI 线程接触的各种共享对象同步,则可能会在其他操作之间造成较差的响应能力。 请考虑使用队列在观察者和主题组件之间代理事件。 例如,可以使用另一个io_service作为队列,该队列由自己的线程运行,并发布到其中:

    auto msg = std::make_shared<chat_message>(std::move(read_msg_));
    for (auto l: m_listeners)
        dispatch_io_service.post([=](){ l->onMessageReceived(msg); });
    
  • 验证 write_msgs_ 的容器类型不会使迭代器、指针和对 push_back() 上的现有元素的引用以及 pop_front() 的其他元素失效。 例如,使用 std::liststd::dequeue 是安全的,但std::vector可能会使对push_back上现有元素的引用无效。

  • 对于单个Stream,可以多次调用StreamCollection::die()。 此函数应该是幂等的或适当地处理副作用。
  • 当给定Stream失败时,它的侦听器只在一条路径中被告知断开连接:无法读取错误为 asio::error::eofasio::error::connection_reset 的标头。 其他路径不调用IStreamListener.onStreamDisconnecting()
    • 标头已读取,但解码失败。 在这种特殊情况下,整个读取链将停止而不通知其他组件。 出现问题的唯一指示是要std::cerr的打印语句。
    • 当读取正文失败时。