缺少使用boost::asio::streambuf的第一个字符
Missing the first characters using boost::asio::streambuf
我使用boost库在C++中制作了一个异步聊天服务器。几乎一切都很好。
客户端有两种断开连接的方式:
- 按Ctrl+C(终止客户端进程)
- 输入"退出"
前者还可以,但后者有问题。如果客户端以"退出"断开连接,则由另一个客户端发送的下一条消息将不包含前几个字符。之后就没事了。
例如:几个客户聊天。其中一个与"退出"断开连接。之后,另一个客户端发送"0123456789abcdefghijk",所有客户端只收到:"abcdefghijk"。我不知道问题出在哪里,我想是streambuf的问题。我在C#中发现了类似的问题(几乎相同)。
这是代码:
#include<iostream>
#include<list>
#include<map>
#include<queue>
#include<vector>
#include<cstdlib>
#include<ctime>
#include<boost/thread.hpp>
#include<boost/bind.hpp>
#include<boost/asio.hpp>
#include<boost/asio/ip/tcp.hpp>
using namespace std;
using namespace boost::asio;
using namespace boost::asio::ip;
typedef boost::shared_ptr<tcp::socket> socket_ptr;
typedef boost::shared_ptr<string> string_ptr;
typedef boost::shared_ptr< list<socket_ptr> > clientList_ptr;
typedef boost::shared_ptr< list<string> > nameList_ptr;
const string waitingMsg("Waiting for clients...n");
const string totalClientsMsg("Total clients: ");
const string errorReadingMsg("Error on reading: ");
const string errorWritingMsg("Error on writing: ");
const int EOF_ERROR_CODE = 2;
const int THREADS = 1;
io_service service;
tcp::acceptor acceptor(service, tcp::endpoint(tcp::v4(), 30001));
boost::mutex mtx;
clientList_ptr clientList(new list<socket_ptr>);
nameList_ptr nameList(new list<string>);
boost::asio::streambuf buff;
time_t timer;
void ready();
void accepting();
void askForName(socket_ptr clientSock, const boost::system::error_code& error);
void receiveName(socket_ptr clientSock, const boost::system::error_code& error,
std::size_t bytes_transferred);
void identify(socket_ptr clientSock, const boost::system::error_code& error, std::size_t bytes_transferred);
void accepted(socket_ptr clientSock, string_ptr name);
void receiveMessage(socket_ptr clientSock, string_ptr name);
void received(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error,
std::size_t bytes_transferred);
bool extract(string_ptr message, std::size_t bytes_transferred);
bool clientSentExit(string_ptr clientSock);
void disconnectClient(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error);
void writeMessage(socket_ptr clientSock, string_ptr message);
void responseSent(const boost::system::error_code& error);
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg);
int main(int argc, char* argv[])
{
try
{
vector<boost::shared_ptr<boost::thread> > threads;
ready();
for (int i = 0; i < THREADS; i++)
{
boost::shared_ptr <boost::thread> t(new boost::thread(boost::bind(&io_service::run, &service)));
threads.push_back(t);
}
for (int i = 0; i < THREADS; i++)
{
threads[i]->join();
}
}
catch (std::exception& error)
{
cerr << error.what() << endl;
}
return 0;
}
void ready()
{
cout << waitingMsg;
accepting();
}
void accepting()
{
socket_ptr clientSock(new tcp::socket(service));
acceptor.async_accept(*clientSock, boost::bind(&askForName, clientSock, boost::asio::placeholders::error));
}
void askForName(socket_ptr sock, const boost::system::error_code& error)
{
if (error)
{
cerr << "Error on accepting: " << error.message() << endl;
}
boost::asio::async_write(*sock, buffer("Please, enter your name:n"),
boost::bind(&receiveName, sock, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
accepting();
}
void receiveName(socket_ptr sock, const boost::system::error_code& error,
std::size_t bytes_transferred)
{
if (error)
{
cerr << errorWritingMsg << error.message() << endl;
}
boost::asio::async_read_until(*sock, buff, 'n',
boost::bind(&identify, sock, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void identify(socket_ptr sock, const boost::system::error_code& error,
std::size_t bytes_transferred)
{
if(error)
{
if (error.value() != EOF_ERROR_CODE)
{
cerr << errorReadingMsg << error.message() << endl;
}
return;
}
string_ptr name(new string(""));
if (!extract(name, bytes_transferred))
{
return;
}
if (find(nameList->begin(), nameList->end(), *name) != nameList->end())
{
boost::asio::async_write(*sock, buffer("This name is already in use! Please, select another name:n"),
boost::bind(&receiveName, sock, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
return;
}
nameList->emplace_back(*name);
accepted(sock, name);
}
void accepted(socket_ptr sock, string_ptr name)
{
mtx.lock();
clientList->emplace_back(sock);
mtx.unlock();
notification(sock, name, "New client: ", " joined. ");
receiveMessage(sock, name);
}
void receiveMessage(socket_ptr sock, string_ptr name)
{
boost::asio::async_read_until(*sock, buff, 'n', boost::bind(&received, sock, name, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void received(socket_ptr sock, string_ptr name, const boost::system::error_code& error,
std::size_t bytes_transferred)
{
if(error)
{
if (error.value() != EOF_ERROR_CODE)
{
cerr << errorReadingMsg << error.message() << endl;
}
disconnectClient(sock, name, error);
return;
}
if(!clientList->empty())
{
//mtx.lock();
string_ptr message(new string(""));
if(!extract(message, bytes_transferred))
{
//mtx.unlock();
disconnectClient(sock, name, error);
return;
}
*message = *name + ": " + *message + "n";
cout << "ChatLog: " << *message << endl;
writeMessage(sock, message);
receiveMessage(sock, name);
//mtx.unlock();
}
}
bool extract(string_ptr message, std::size_t bytes_transferred)
{
mtx.lock();
buff.commit(bytes_transferred);
std::istream istrm(&buff);
//mtx.unlock();
std::getline(istrm, *message);
buff.consume(buff.size());
string_ptr msgEndl(new string(*message + "n"));
mtx.unlock();
if(clientSentExit(msgEndl))
{
return false;
}
return true;
}
bool clientSentExit(string_ptr message)
{
return message->compare(0, 5, "exitn") == 0;
}
void disconnectClient(socket_ptr sock, string_ptr name, const boost::system::error_code& error)
{
boost::system::error_code ec = error;
auto position = find(clientList->begin(), clientList->end(), sock);
auto namePos = find(nameList->begin(), nameList->end(), *name);
sock->shutdown(tcp::socket::shutdown_both, ec);
if (ec)
{
cerr << "Error on shutting: " << ec.message() << endl;
}
sock->close(ec);
if(ec)
{
cerr << "Error on closing: " << ec.message() << endl;
}
clientList->erase(position);
nameList->erase(namePos);
notification(sock, name, "", " disconnected. ");
}
void writeMessage(socket_ptr sock, string_ptr message)
{
for(auto& cliSock : *clientList)
{
if (cliSock->is_open() && cliSock != sock)
{
boost::asio::async_write(*cliSock, buffer(*message),
boost::bind(&responseSent, boost::asio::placeholders::error));
}
}
}
void responseSent(const boost::system::error_code& error)
{
if (error)
{
cerr << "Error on writing: " << error.message() << endl;
}
}
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg)
{
string_ptr serviceMsg (new string (headOfMsg + *name + tailOfMsg));
cout << *serviceMsg << totalClientsMsg << clientList->size() << endl;
*serviceMsg = *serviceMsg + "n";
writeMessage(sock, serviceMsg);
cout << waitingMsg;
}
有趣的是,我有类似的同步服务器,使用streambuf的方式相同,但没有这样的问题。
boost::asio::async_read_until()可以在\n之后读取任何数量的字符到streambuf。然后它会给您bytes_transfered,这是第一行中的字符数(不一定是读取到缓冲区的字符数)。请参阅文档。
只要保持缓冲区变量不变,下一个boost::asio::async_read_until()将首先从缓冲区读取字符,然后从套接字读取字符。
在我看来,您使用getline()从缓冲区读取了一行,这是正确的。之后,你打电话给
buff.consume(buff.size());
它清除缓冲区,删除您可能收到的部分行的所有信息。您读取的第一个完整行已经被getline()从缓冲区中删除,因此consume()调用在任何情况下都是不必要的。
仅仅删除consume()调用并不能解决您的问题,因为您似乎使用了一个在所有客户端之间共享的缓冲区,并且您不知道哪个客户端的部分数据是什么。一个可能的解决方案可以创建一个缓冲区列表(每个客户端一个),就像您有一个套接字列表一样。然后boost::asio::async_read_until()和getline()将负责处理部分数据,您不必考虑这一点。
另一个答案解释了问题所在。
然而,找出如何修复它可能有点棘手
也许你可以从我在这里处理的一个类似问题中找到灵感:
- Boost asio:无法读取URL正文(JSON)
在这里,OP遇到了基本上相同的问题:在阅读了他的HTTP头之后,他会"丢弃"身体的一部分。所以我添加了逻辑:
注意 同样,重要的是不要假设标头的末尾与数据包边界重合。因此,启动
read_body()
,同时排出已接收到的可用输入
std::shared_ptr<std::vector<char> > bufptr = std::make_shared<std::vector<char> >(nbuffer);
auto partial = std::copy(
std::istreambuf_iterator<char>(&pThis->buff), {},
bufptr->begin());
std::size_t already_received = std::distance(bufptr->begin(), partial);
assert(nbuffer >= already_received);
nbuffer -= already_received;
我想我解决了这个问题。刚刚创建了一个streambuf列表-每个客户端的streambuf。但我不得不保留consume()函数,因为否则,检查给定名称是否已经存在失败,导致可能有多个客户端共享同一名称。然后消息传递停止了工作,但我删除了extract()中的锁,现在一切似乎都很好。
- 为什么它只打印双链接列表的第一个值,而我的程序却崩溃了
- std::find,返回所有找到的值的替代方法,而不仅仅是存在重复的向量的第一个值
- 如何仅读取文本文件中的第一个值
- 在C++中,如何在第一个"system()"结束后执行第二个"system()"?
- 查找不在标准中的第一个值::设置<int>最小-最大值
- C++:忽略第一个 cin.ignore 之后的输入
- 在C++中打印多个矢量的第一个值
- C++去除前x个元素的有效方法,在不改变向量大小的情况下将第x+1个元素推到第一个
- C++第一个cout将不会打印
- 我们可以在第一个else-if条件结束后使用另一个else-if条件吗
- OpenGL:第二个VBO破坏了第一个VBO
- 为什么第一个Dynamic_cast没有投射到基类?
- OpenGL 2D游戏只绘制第二个精灵纹理而不是第一个
- C++ 为什么程序只读取第一个值
- 在我的第一个C++程序中需要一些帮助(简单)
- 为什么我的代码在第一个 if 语句处中断?
- 是否可以从另一个类对象调用一个类函数而不继承第一个类
- 无法使我的第一个Windows OpenGL窗口抬起并运行
- 将参数初始化为构造函数,而不是第一个
- 无法在硬件模式下创建 SGX 安全区 - "invalid launch token"即使文档将无效的启动令牌指定为第一个