Boost asio,单个TCP服务器,多个客户端
Boost asio, single TCP server, many clients
我正在创建一个TCP服务器,该服务器将使用boost asio,它将接受来自许多客户端的连接、接收数据并发送确认。问题是,我希望能够接受所有的客户,但我希望一次只与一个客户合作。我希望所有其他事务都保留在一个队列中。
示例:
- Client1连接
- Client2连接
- Client1发送数据并请求回复
- Client2发送数据并请求回复
- Client2的请求被放入队列
- 读取客户端1的数据,服务器回复,事务结束
- Client2的请求从队列中获取,服务器读取数据,回复事务结束
所以这介于异步服务器和阻塞服务器之间。我想一次只做一件事,但同时我想能够将所有客户端套接字及其需求存储在队列中。
我能够用我需要的所有功能创建服务器-客户端通信,但只能在单个线程上。一旦客户端断开连接,服务器也会终止。我真的不知道如何开始实施我上面提到的。每次接受连接时,我应该打开新线程吗?我应该使用async_accept还是阻塞接受?
我读过boost::asio聊天的例子,其中许多客户端连接到单个服务器,但这里没有我需要的排队机制。
我知道这篇文章可能有点令人困惑,但TCP服务器对我来说是新的,所以我对术语不够熟悉。也并没有发布源代码,因为我只是在寻求这个项目概念的帮助。
继续接受。
你没有显示任何代码,但它通常看起来像
void do_accept() {
acceptor_.async_accept(socket_, [this](boost::system::error_code ec) {
std::cout << "async_accept -> " << ec.message() << "n";
if (!ec) {
std::make_shared<Connection>(std::move(socket_))->start();
do_accept(); // THIS LINE
}
});
}
如果不包括标记为// THIS LINE
的行,则实际上不会接受超过1个连接。
如果这没有帮助,请包含一些我们可以使用的代码。
为了好玩,一个演示
这只使用非网络零件的标准库功能。
网络侦听器
网络部分如前所述:
#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <istream>
using namespace std::chrono_literals;
using Clock = std::chrono::high_resolution_clock;
namespace Shared {
using PostRequest = std::function<void(std::istream& is)>;
}
namespace Network {
namespace ba = boost::asio;
using ba::ip::tcp;
using error_code = boost::system::error_code;
using Shared::PostRequest;
struct Connection : std::enable_shared_from_this<Connection> {
Connection(tcp::socket&& s, PostRequest poster) : _s(std::move(s)), _poster(poster) {}
void process() {
auto self = shared_from_this();
ba::async_read(_s, _request, [this,self](error_code ec, size_t) {
if (!ec || ec == ba::error::eof) {
std::istream reader(&_request);
_poster(reader);
}
});
}
private:
tcp::socket _s;
ba::streambuf _request;
PostRequest _poster;
};
struct Server {
Server(unsigned port, PostRequest poster) : _port(port), _poster(poster) {}
void run_for(Clock::duration d = 30s) {
_stop.expires_from_now(d);
_stop.async_wait([this](error_code ec) { if (!ec) _svc.post([this] { _a.close(); }); });
_a.listen();
do_accept();
_svc.run();
}
private:
void do_accept() {
_a.async_accept(_s, [this](error_code ec) {
if (!ec) {
std::make_shared<Connection>(std::move(_s), _poster)->process();
do_accept();
}
});
}
unsigned short _port;
PostRequest _poster;
ba::io_service _svc;
ba::high_resolution_timer _stop { _svc };
tcp::acceptor _a { _svc, tcp::endpoint {{}, _port } };
tcp::socket _s { _svc };
};
}
与工作服务部分的唯一"连接"是PostRequest
处理程序,该处理程序在构造时传递给服务器:
Network::Server server(6767, handler);
我还选择了异步操作,所以我们可以有一个定时器来停止服务,即使我们不使用任何线程:
server.run_for(3s); // this blocks
工作部分
这是完全独立的,并且将使用线程。首先,让我们定义一个Request
和一个线程安全的Queue
:
namespace Service {
struct Request {
std::vector<char> data; // or whatever you read from the sockets...
};
Request parse_request(std::istream& is) {
Request result;
result.data.assign(std::istream_iterator<char>(is), {});
return result;
}
struct Queue {
Queue(size_t max = 50) : _max(max) {}
void enqueue(Request req) {
std::unique_lock<std::mutex> lk(mx);
cv.wait(lk, [this] { return _queue.size() < _max; });
_queue.push_back(std::move(req));
cv.notify_one();
}
Request dequeue(Clock::time_point deadline) {
Request req;
{
std::unique_lock<std::mutex> lk(mx);
_peak = std::max(_peak, _queue.size());
if (cv.wait_until(lk, deadline, [this] { return _queue.size() > 0; })) {
req = std::move(_queue.front());
_queue.pop_front();
cv.notify_one();
} else {
throw std::range_error("dequeue deadline");
}
}
return req;
}
size_t peak_depth() const {
std::lock_guard<std::mutex> lk(mx);
return _peak;
}
private:
mutable std::mutex mx;
mutable std::condition_variable cv;
size_t _max = 50;
size_t _peak = 0;
std::deque<Request> _queue;
};
这并没有什么特别之处,实际上还没有使用线程。让我们制作一个工作程序函数,它接受对队列的引用(如果需要,可以启动一个以上的工作程序):
void worker(std::string name, Queue& queue, Clock::duration d = 30s) {
auto const deadline = Clock::now() + d;
while(true) try {
auto r = queue.dequeue(deadline);
(std::cout << "Worker " << name << " handling request '").write(r.data.data(), r.data.size()) << "'n";
}
catch(std::exception const& e) {
std::cout << "Worker " << name << " got " << e.what() << "n";
break;
}
}
}
main
驱动程序
在这里,队列被实例化,网络服务器和一些工作线程都被启动:
int main() {
Service::Queue queue;
auto handler = [&](std::istream& is) {
queue.enqueue(Service::parse_request(is));
};
Network::Server server(6767, handler);
std::vector<std::thread> pool;
pool.emplace_back([&queue] { Service::worker("one", queue, 6s); });
pool.emplace_back([&queue] { Service::worker("two", queue, 6s); });
server.run_for(3s); // this blocks
for (auto& thread : pool)
if (thread.joinable())
thread.join();
std::cout << "Maximum queue depth was " << queue.peak_depth() << "n";
}
现场演示
See It Live On Coliru
测试负载如下所示:
for a in "hello world" "the quick" "brown fox" "jumped over" "the pangram" "bye world"
do
netcat 127.0.0.1 6767 <<< "$a" || echo "not sent: '$a'"&
done
wait
它打印的东西像:
Worker one handling request 'brownfox'
Worker one handling request 'thepangram'
Worker one handling request 'jumpedover'
Worker two handling request 'Worker helloworldone handling request 'byeworld'
Worker one handling request 'thequick'
'
Worker one got dequeue deadline
Worker two got dequeue deadline
Maximum queue depth was 6
您需要的includes。有些可能是不必要的:
boost/asio.hpp
、boost/thread.hpp
、boost/asio/io_service.hpp
boost/asio/spawn.hpp
、boost/asio/write.hpp
、boost/asio/buffer.hpp
boost/asio/ip/tcp.hpp
、iostream
、stdlib.h
、array
、string
vector
、string.h
、stdio.h
、process.h
、iterator
using namespace boost::asio;
using namespace boost::asio::ip;
io_service ioservice;
tcp::endpoint sim_endpoint{ tcp::v4(), 4066 }; //{which connectiontype, portnumber}
tcp::acceptor sim_acceptor{ ioservice, sim_endpoint };
std::vector<tcp::socket> sim_sockets;
static int iErgebnis;
int iSocket = 0;
void do_write(int a) //int a is the postion of the socket in the vector
{
int iWSchleife = 1; //to stay connected with putty or something
static char chData[32000];
std::string sBuf = "Received!rn";
while (iWSchleife > 0)
{
boost::system::error_code error;
memset(chData, 0, sizeof(chData)); //clear the char
iErgebnis = sim_sockets[a].read_some(boost::asio::buffer(chData), error); //recv data from client
iWSchleife = iErgebnis; //if iErgebnis is bigger then 0 it will stay in the loop. iErgebniss is always >0 when data is received
if (iErgebnis > 0) {
printf("%d data received from client : n%snn", iErgebnis, chData);
write(sim_sockets[a], boost::asio::buffer(sBuf), error); //send data to client
}
else {
boost::system::error_code ec;
sim_sockets[a].shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec); //close the socket when no data
if (ec)
{
printf("studown error"); // An error occurred.
}
}
}
}
void do_accept(yield_context yield)
{
while (1) //endless loop to accept limitless clients
{
sim_sockets.emplace_back(ioservice); //look to the link below for more info
sim_acceptor.async_accept(sim_sockets.back(), yield); //waits here to accept an client
boost::thread dosome(do_write, iSocket); //when accepted, starts the thread do_write and passes the parameter iSocket
iSocket++; //to know the position of the socket in the vector
}
}
int main()
{
sim_acceptor.listen();
spawn(ioservice, do_accept); //here you can learn more about Coroutines https://theboostcpplibraries.com/boost.coroutine
ioservice.run(); //from here you jump to do:accept
getchar();
}
- C/C++ Linux 上的多线程服务器/客户端崩溃
- 在 1 个服务器 n 客户端套接字 C++ MFC 应用程序中更新数据的客户端
- ZMQ - 客户端服务器:客户端意外关闭,服务器如何检测到?
- C++服务器/客户端聊天程序
- Boost-Beast 异步 Web 套接字 服务器-客户端异步读写 不在控制台上写入输出
- 服务器客户端通过原始数据错误C 发送接收结构
- TCP 服务器/客户端:客户端 recv() 返回空白缓冲区
- 在使用 Poll() 的 TCP 服务器 - 客户端连接中,我是否需要手动设置事件?我从来没有到达 POLLOUT 来写
- 是否可以寻址另一个网络中的服务器/客户端套接字?(C++)
- 嗅探 QWebsocket 服务器 - 客户端通信
- C++(不是 C)中的示例服务器/客户端代码
- 如何将ZeroMQ用于多个服务器-客户端对
- 语言之间的服务器客户端首选项
- 服务器客户端程序上的QT-QFileSystemModel
- SFML网络服务器/客户端
- 服务器/客户端TCP异步(winsock)//FD_WRITE问题
- 基本服务器/客户端代码出现问题
- 将我的简单winsock服务器/客户端应用程序变形为简单的Web服务器
- C++ Winsock 服务器-客户端远程连接
- 修复:C++服务器/客户端程序:"Connection refused"