C++ ASIO:异步套接字和线程
C++ ASIO: Asynchronous sockets and threading
我的应用程序基于 asio 聊天示例,由客户端和服务器组成:- 客户端:连接到服务器,接收请求并响应它- 服务器:具有QT GUI(主线程)和网络服务(独立线程),侦听连接,向特定客户端发送请求并解释来自GUI的响应
我想以异步方式实现这一点,以避免每个客户端连接都有单独的线程。
在我的 QT 窗口中,我有一个io_service
实例和一个网络服务实例:
io_service_ = new asio::io_service();
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), "1234");
service_ = new Service(*io_service_, endpoint, this);
asio::io_service* ioServicePointer = io_service_;
t = std::thread{ [ioServicePointer](){ ioServicePointer->run(); } };
我希望能够将数据发送到一个客户端,如下所示:
service_->send_message(selectedClient.id, msg);
我通过观察者模式接收和处理响应(该窗口实现 IStreamListener 接口)
服务.cpp:
#include "Service.h"
#include "Stream.h"
void Service::runAcceptor()
{
acceptor_.async_accept(socket_,
[this](asio::error_code ec)
{
if (!ec)
{
std::make_shared<Stream>(std::move(socket_), &streams_)->start();
}
runAcceptor();
});
}
void Service::send_message(std::string streamID, chat_message& msg)
{
io_service_.post(
[this, msg, streamID]()
{
auto stream = streams_.getStreamByID(streamID);
stream->deliver(msg);
});
}
流.cpp:
#include "Stream.h"
#include <iostream>
#include "../chat_message.h"
Stream::Stream(asio::ip::tcp::socket socket, StreamCollection* streams)
: socket_(std::move(socket))
{
streams_ = streams; // keep a reference to the streamCollection
// retrieve endpoint ip
asio::ip::tcp::endpoint remote_ep = socket_.remote_endpoint();
asio::ip::address remote_ad = remote_ep.address();
this->ip_ = remote_ad.to_string();
}
void Stream::start()
{
streams_->join(shared_from_this());
readHeader();
}
void Stream::deliver(const chat_message& msg)
{
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
if (!write_in_progress)
{
write();
}
}
std::string Stream::getName()
{
return name_;
}
std::string Stream::getIP()
{
return ip_;
}
void Stream::RegisterListener(IStreamListener *l)
{
m_listeners.insert(l);
}
void Stream::UnregisterListener(IStreamListener *l)
{
std::set<IStreamListener *>::const_iterator iter = m_listeners.find(l);
if (iter != m_listeners.end())
{
m_listeners.erase(iter);
}
else {
std::cerr << "Could not unregister the specified listener object as it is not registered." << std::endl;
}
}
void Stream::readHeader()
{
auto self(shared_from_this());
asio::async_read(socket_,
asio::buffer(read_msg_.data(), chat_message::header_length),
[this, self](asio::error_code ec, std::size_t /*length*/)
{
if (!ec && read_msg_.decode_header())
{
readBody();
}
else if (ec == asio::error::eof || ec == asio::error::connection_reset)
{
std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamListener *l) {l->onStreamDisconnecting(this->id()); });
streams_->die(shared_from_this());
}
else
{
std::cerr << "Exception: " << ec.message();
}
});
}
void Stream::readBody()
{
auto self(shared_from_this());
asio::async_read(socket_,
asio::buffer(read_msg_.body(), read_msg_.body_length()),
[this, self](asio::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
// notify the listener (GUI) that a response has arrived and pass a reference to it
auto msg = std::make_shared<chat_message>(std::move(read_msg_));
std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamListener *l) {l->onMessageReceived(msg); });
readHeader();
}
else
{
streams_->die(shared_from_this());
}
});
}
void Stream::write()
{
auto self(shared_from_this());
asio::async_write(socket_,
asio::buffer(write_msgs_.front().data(),
write_msgs_.front().length()),
[this, self](asio::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
write_msgs_.pop_front();
if (!write_msgs_.empty())
{
write();
}
}
else
{
streams_->die(shared_from_this());
}
});
}
接口
class IStream
{
public:
/// Unique stream identifier
typedef void* TId;
virtual TId id() const
{
return (TId)(this);
}
virtual ~IStream() {}
virtual void deliver(const chat_message& msg) = 0;
virtual std::string getName() = 0;
virtual std::string getIP() = 0;
/// observer pattern
virtual void RegisterListener(IStreamListener *l) = 0;
virtual void UnregisterListener(IStreamListener *l) = 0;
};
class IStreamListener
{
public:
virtual void onStreamDisconnecting(IStream::TId streamId) = 0;
virtual void onMessageReceived(std::shared_ptr<chat_message> msg) = 0;
};
/*
streamCollection / service delegates
*/
class IStreamCollectionListener
{
public:
virtual void onStreamDied(IStream::TId streamId) = 0;
virtual void onStreamCreated(std::shared_ptr<IStream> stream) = 0;
};
StreamCollection 基本上是一组 IStreams:
class StreamCollection
{
public:
void join(stream_ptr stream)
{
streams_.insert(stream);
std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamCollectionListener *l) {l->onStreamCreated(stream); });
}
// more events and observer pattern inplementation
首先:到目前为止,代码按预期工作。
我的问题:这是ASIO应该用于异步编程的方式吗?我特别不确定Service::send_message
方法和io_service.post
的使用.在我的情况下,它的目的是什么?当我刚刚打电话给async_write时,它确实有效,没有将其包装在 io_service.post 调用中。
我在使用这种方法时遇到问题了吗?
Asio被设计成一个tookit而不是一个框架。 因此,有多种方法可以成功使用它。 分离 GUI 和网络线程,并使用异步 I/O 实现可伸缩性可能是一个好主意。
将工作委派给公共 API 中的io_service
(例如 Service::send_message()
)会产生以下后果:
- 将调用方的线程与为
io_service
提供服务的线程分离。 例如,如果Stream::write()
执行耗时的加密功能,则调用方线程 (GUI) 不会受到影响。 - 它提供线程安全。
io_service
是线程安全的;但是socket
不是线程安全的。 此外,其他对象可能不是线程安全的,例如write_msgs_
。 Asio 保证处理程序只会从运行io_servce
的线程中调用。 因此,如果只有一个线程在运行io_service
,则不可能并发,并且socket_
和write_msgs_
都将以线程安全的方式访问。 Asio将此称为隐含strand
。 如果有多个线程正在处理io_service
,则可能需要使用显式strand
来提供线程安全性。 有关股线的更多详细信息,请参阅此答案。
其他 Asio 注意事项:
- 观察器在处理程序中
调用,处理程序在网络线程中运行。 如果任何观察者需要很长时间才能完成,例如必须与 GUI 线程接触的各种共享对象同步,则可能会在其他操作之间造成较差的响应能力。 请考虑使用队列在观察者和主题组件之间代理事件。 例如,可以使用另一个
io_service
作为队列,该队列由自己的线程运行,并发布到其中:auto msg = std::make_shared<chat_message>(std::move(read_msg_)); for (auto l: m_listeners) dispatch_io_service.post([=](){ l->onMessageReceived(msg); });
验证
write_msgs_
的容器类型不会使迭代器、指针和对push_back()
上的现有元素的引用以及pop_front()
的其他元素失效。 例如,使用std::list
或std::dequeue
是安全的,但std::vector
可能会使对push_back
上现有元素的引用无效。- 对于单个
Stream
,可以多次调用StreamCollection::die()
。 此函数应该是幂等的或适当地处理副作用。 - 当给定
Stream
失败时,它的侦听器只在一条路径中被告知断开连接:无法读取错误为asio::error::eof
或asio::error::connection_reset
的标头。 其他路径不调用IStreamListener.onStreamDisconnecting()
:- 标头已读取,但解码失败。 在这种特殊情况下,整个读取链将停止而不通知其他组件。 出现问题的唯一指示是要
std::cerr
的打印语句。 - 当读取正文失败时。
- 标头已读取,但解码失败。 在这种特殊情况下,整个读取链将停止而不通知其他组件。 出现问题的唯一指示是要
- 接受线程 C++ 套接字中的函数循环
- 关于套接字通信的线程
- 用于线程间通信的 Windows 套接字
- 具有多线程应用程序的 Nanomsg 无阻塞双向套接字
- 套接字发送(.)线程的最佳数量
- 包含线程后C++套接字错误WSAENOTSOCK (10038)
- 多线程套接字编程服务器仅从 1 个客户端线程获取消息
- 每个线程或每个调用一个 ZeroMQ 套接字
- QSocketNotifier:不能从另一个线程启用或禁用套接字通知程序
- 使用Berkeley套接字的线程通知
- 在多线程HTTP服务器中发送后,如何干净地关闭套接字
- C /Linux:如何编写使用套接字的线程安全库
- windows DLL是否有可能在多个线程或进程之间使用相同的套接字编号
- c++系统()挂起,使用netcat连接到不同线程中的套接字
- 访问已传递给线程的套接字
- 提升同一对象上的异步套接字和线程池io_service
- c++tcp多线程客户端/服务器-如何与线程套接字处理程序进行通信
- 多线程套接字超时
- c++多线程套接字无法接收客户端数据
- 多线程套接字操作与进度条更新