缺少使用boost::asio::streambuf的第一个字符

Missing the first characters using boost::asio::streambuf

本文关键字:streambuf 第一个 字符 asio boost      更新时间:2023-10-16

我使用boost库在C++中制作了一个异步聊天服务器。几乎一切都很好。

客户端有两种断开连接的方式:

  • 按Ctrl+C(终止客户端进程)
  • 输入"退出"

前者还可以,但后者有问题。如果客户端以"退出"断开连接,则由另一个客户端发送的下一条消息将不包含前几个字符。之后就没事了。

例如:几个客户聊天。其中一个与"退出"断开连接。之后,另一个客户端发送"0123456789abcdefghijk",所有客户端只收到:"abcdefghijk"。我不知道问题出在哪里,我想是streambuf的问题。我在C#中发现了类似的问题(几乎相同)。

这是代码:

#include<iostream>
#include<list>
#include<map>
#include<queue>
#include<vector>
#include<cstdlib>
#include<ctime>
#include<boost/thread.hpp>
#include<boost/bind.hpp>
#include<boost/asio.hpp>
#include<boost/asio/ip/tcp.hpp>
using namespace std;
using namespace boost::asio;
using namespace boost::asio::ip;
typedef boost::shared_ptr<tcp::socket> socket_ptr;
typedef boost::shared_ptr<string> string_ptr;
typedef boost::shared_ptr< list<socket_ptr> > clientList_ptr;
typedef boost::shared_ptr< list<string> > nameList_ptr;
const string waitingMsg("Waiting for clients...n");
const string totalClientsMsg("Total clients: ");
const string errorReadingMsg("Error on reading: ");
const string errorWritingMsg("Error on writing: ");
const int EOF_ERROR_CODE = 2;
const int THREADS = 1;
io_service service;
tcp::acceptor acceptor(service, tcp::endpoint(tcp::v4(), 30001));
boost::mutex mtx;
clientList_ptr clientList(new list<socket_ptr>);
nameList_ptr nameList(new list<string>);
boost::asio::streambuf buff;
time_t timer;
void ready();
void accepting();
void askForName(socket_ptr clientSock, const boost::system::error_code& error);
void receiveName(socket_ptr clientSock, const boost::system::error_code& error,
        std::size_t bytes_transferred);
void identify(socket_ptr clientSock, const boost::system::error_code& error, std::size_t bytes_transferred);
void accepted(socket_ptr clientSock, string_ptr name);
void receiveMessage(socket_ptr clientSock, string_ptr name);
void received(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error,
        std::size_t bytes_transferred);
bool extract(string_ptr message, std::size_t bytes_transferred);
bool clientSentExit(string_ptr clientSock);
void disconnectClient(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error);
void writeMessage(socket_ptr clientSock, string_ptr message);
void responseSent(const boost::system::error_code& error);
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg);

int main(int argc, char* argv[])
{
    try
    {
        vector<boost::shared_ptr<boost::thread> > threads;
        ready();
        for (int i = 0; i < THREADS; i++)
        {
            boost::shared_ptr <boost::thread> t(new boost::thread(boost::bind(&io_service::run, &service)));
            threads.push_back(t);
        }
        for (int i = 0; i < THREADS; i++)
        {
            threads[i]->join();
        }
    }
    catch (std::exception& error)
    {
        cerr << error.what() << endl;
    }
    return 0;
}
void ready()
{
    cout << waitingMsg;
    accepting();
}
void accepting()
{
    socket_ptr clientSock(new tcp::socket(service));
    acceptor.async_accept(*clientSock, boost::bind(&askForName, clientSock, boost::asio::placeholders::error));
}
void askForName(socket_ptr sock, const boost::system::error_code& error)
{
    if (error)
    {
        cerr << "Error on accepting: " << error.message() << endl;
    }
    boost::asio::async_write(*sock, buffer("Please, enter your name:n"),
            boost::bind(&receiveName, sock, boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
    accepting();
}
void receiveName(socket_ptr sock, const boost::system::error_code& error,
        std::size_t bytes_transferred)
{
    if (error)
    {
        cerr << errorWritingMsg << error.message() << endl;
    }
    boost::asio::async_read_until(*sock, buff, 'n',
            boost::bind(&identify, sock, boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
}
void identify(socket_ptr sock, const boost::system::error_code& error,
        std::size_t bytes_transferred)
{
    if(error)
    {
        if (error.value() != EOF_ERROR_CODE)
        {
            cerr << errorReadingMsg << error.message() << endl;
        }
        return;
    }
    string_ptr name(new string(""));
    if (!extract(name, bytes_transferred))
    {
        return;
    }
    if (find(nameList->begin(), nameList->end(), *name) != nameList->end())
    {
        boost::asio::async_write(*sock, buffer("This name is already in use! Please, select another name:n"),
                    boost::bind(&receiveName, sock, boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
        return;
    }
    nameList->emplace_back(*name);
    accepted(sock, name);
}
void accepted(socket_ptr sock, string_ptr name)
{
    mtx.lock();
    clientList->emplace_back(sock);
    mtx.unlock();
    notification(sock, name, "New client: ", " joined. ");
    receiveMessage(sock, name);
}
void receiveMessage(socket_ptr sock, string_ptr name)
{
    boost::asio::async_read_until(*sock, buff, 'n', boost::bind(&received, sock, name, boost::asio::placeholders::error,
              boost::asio::placeholders::bytes_transferred));
}
void received(socket_ptr sock, string_ptr name, const boost::system::error_code& error,
        std::size_t bytes_transferred)
{
    if(error)
    {
        if (error.value() != EOF_ERROR_CODE)
        {
            cerr << errorReadingMsg << error.message() << endl;
        }
        disconnectClient(sock, name, error);
        return;
    }
    if(!clientList->empty())
    {
        //mtx.lock();
        string_ptr message(new string(""));
        if(!extract(message, bytes_transferred))
        {
            //mtx.unlock();
            disconnectClient(sock, name, error);
            return;
        }
        *message = *name + ": " + *message + "n";
        cout << "ChatLog: " << *message << endl;
        writeMessage(sock, message);
        receiveMessage(sock, name);
        //mtx.unlock();
    }
}
bool extract(string_ptr message, std::size_t bytes_transferred)
{
    mtx.lock();
    buff.commit(bytes_transferred);
    std::istream istrm(&buff);
    //mtx.unlock();
    std::getline(istrm, *message);
    buff.consume(buff.size());
    string_ptr msgEndl(new string(*message + "n"));
    mtx.unlock();
    if(clientSentExit(msgEndl))
    {
        return false;
    }
    return true;
}
bool clientSentExit(string_ptr message)
{
    return message->compare(0, 5, "exitn") == 0;
}
void disconnectClient(socket_ptr sock, string_ptr name, const boost::system::error_code& error)
{
    boost::system::error_code ec = error;
    auto position = find(clientList->begin(), clientList->end(), sock);
    auto namePos = find(nameList->begin(), nameList->end(), *name);
    sock->shutdown(tcp::socket::shutdown_both, ec);
    if (ec)
    {
        cerr << "Error on shutting: " << ec.message() << endl;
    }
    sock->close(ec);
    if(ec)
    {
        cerr << "Error on closing: " << ec.message() << endl;
    }
    clientList->erase(position);
    nameList->erase(namePos);
    notification(sock, name, "", " disconnected. ");
}
void writeMessage(socket_ptr sock, string_ptr message)
{
    for(auto& cliSock : *clientList)
    {
        if (cliSock->is_open() && cliSock != sock)
        {
            boost::asio::async_write(*cliSock, buffer(*message),
                    boost::bind(&responseSent, boost::asio::placeholders::error));
        }
    }
}
void responseSent(const boost::system::error_code& error)
{
    if (error)
    {
        cerr << "Error on writing: " << error.message() << endl;
    }
}
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg)
{
    string_ptr serviceMsg (new string (headOfMsg + *name + tailOfMsg));
    cout << *serviceMsg << totalClientsMsg << clientList->size() << endl;
    *serviceMsg = *serviceMsg + "n";
    writeMessage(sock, serviceMsg);
    cout << waitingMsg;
}

有趣的是,我有类似的同步服务器,使用streambuf的方式相同,但没有这样的问题。

boost::asio::async_read_until()可以在\n之后读取任何数量的字符到streambuf。然后它会给您bytes_transfered,这是第一行中的字符数(不一定是读取到缓冲区的字符数)。请参阅文档。

只要保持缓冲区变量不变,下一个boost::asio::async_read_until()将首先从缓冲区读取字符,然后从套接字读取字符。

在我看来,您使用getline()从缓冲区读取了一行,这是正确的。之后,你打电话给

buff.consume(buff.size());

它清除缓冲区,删除您可能收到的部分行的所有信息。您读取的第一个完整行已经被getline()从缓冲区中删除,因此consume()调用在任何情况下都是不必要的。

仅仅删除consume()调用并不能解决您的问题,因为您似乎使用了一个在所有客户端之间共享的缓冲区,并且您不知道哪个客户端的部分数据是什么。一个可能的解决方案可以创建一个缓冲区列表(每个客户端一个),就像您有一个套接字列表一样。然后boost::asio::async_read_until()和getline()将负责处理部分数据,您不必考虑这一点。

另一个答案解释了问题所在。

然而,找出如何修复它可能有点棘手

也许你可以从我在这里处理的一个类似问题中找到灵感:

  • Boost asio:无法读取URL正文(JSON)

在这里,OP遇到了基本上相同的问题:在阅读了他的HTTP头之后,他会"丢弃"身体的一部分。所以我添加了逻辑:

注意 同样,重要的是不要假设标头的末尾与数据包边界重合。因此,启动read_body(),同时排出已接收到的可用输入

std::shared_ptr<std::vector<char> > bufptr = std::make_shared<std::vector<char> >(nbuffer);
auto partial = std::copy(
        std::istreambuf_iterator<char>(&pThis->buff), {}, 
        bufptr->begin());
std::size_t already_received = std::distance(bufptr->begin(), partial);
assert(nbuffer >= already_received);
nbuffer -= already_received;

我想我解决了这个问题。刚刚创建了一个streambuf列表-每个客户端的streambuf。但我不得不保留consume()函数,因为否则,检查给定名称是否已经存在失败,导致可能有多个客户端共享同一名称。然后消息传递停止了工作,但我删除了extract()中的锁,现在一切似乎都很好。