套接字未接收有关 HPUX 的完整数据

Socket doesn't receive complete data on HPUX

本文关键字:整数 数据 HPUX 套接字      更新时间:2023-10-16

我真的不明白这里出了什么问题,所以我希望有人能指出我遗漏的东西。

我正在写一个用户守护进程,它接受我用java开发的客户端。目前,该客户端仅连接并发送用户名密码。我在cygwin下开发了代码,它在那里工作。守护进程发送它的介绍,然后客户端发送用户名和密码,守护进程以断开客户端连接或发送OK(尚未完成)作为响应。

当我使用cygwin测试时,它在本地主机上工作。我将代码移植到HPUX,客户机可以连接并接收来自守护进程的介绍。现在,当客户端发送用户名和密码时,它不再工作了。守护进程只接收一个字节,当它试图再次读取时,我得到-1的结果是EAGAIN,没有其他任何东西。客户端没有显示任何错误,守护进程端也没有显示任何错误。当我使用gdb逐步执行代码时,消息被完全接收到。(

我使用的代码是这样的,如果需要更多的信息,我可以添加它:

int TCPSocket::receive(SocketMessage &oMessage, int nBytes)
{
    int max = getSendBufferSize();
    if(nBytes != -1 && nBytes < max)
        max = nBytes;
    SocketMessage sb(max);
    int rcv_bytes = 0;
    int total = 0;
    int error = 0;
    while(1)
    {
        rcv_bytes = ::recv(getSocketId(), &sb[0], sb.size(), 0);
        error = errno;
        FILE_LOG(logDEBUG4) << "Received on socket: " << mSocketId << " bytes: " << rcv_bytes << "  expected:" << sb.size() << " total: " << total << " errno: " << error;
        if(rcv_bytes == -1)
        {
            if(error == EAGAIN || error == EWOULDBLOCK)
                return total;
            throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection error!", __FILE__, __LINE__);
        }
        //if(rcv_bytes == 0)
        //  throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection has been closed!");
        total += rcv_bytes;
        oMessage.insert(oMessage.end(), sb.begin(), sb.begin()+rcv_bytes);
    }
    return total;
}

日志的输出如下:

16:16:04.391 DEBUG4: Received on socket: 4 bytes: 1  expected:32768 total: 0 errno: 2
16:16:04.391 DEBUG4: Received on socket: 4 bytes: -1  expected:32768 total: 1 errno: 11

那么剩下的30个字节在哪里,为什么没有返回?

这只是实际接收数据的代码的一部分。套接字类本身只处理没有任何协议的原始套接字。该协议在一个单独的类中实现。这个接收函数应该抓取网络上可用的尽可能多的字节,并将其放入缓冲区(SocketMessage)。字节数是多个消息还是单个消息的一部分并不重要,因为控制类将从(可能)部分消息流中构造实际的消息块。因此,第一个调用只接收一个字节并不是问题,因为如果消息没有完成,调用者将在循环中等待,直到更多的数据到达并且消息完成。因为可以有多个客户端,我使用非阻塞套接字。我不想处理单独的线程,所以我的服务器是多路复用连接。这里的问题是,接收只接收一个字节,而我知道应该有更多。错误码EAGAIN被处理,当下一次输入接收时,它应该得到更多的字节。即使网络只传输了一个字节,消息的其余部分仍然应该下一个到达,但它没有。等待套接字接收数据块的选择,就好像那里什么都没有一样。当我在dbg中运行相同的代码并逐步执行时,它可以工作。当我再次连接到同一个客户端时,突然接收到更多的字节。当我使用相同的代码与cygwin使用localhost时,它工作得很好。

这是完整的代码。

Main.cpp
    mServerSocket = new TCPSocket(getListeningPort());
    mServerSocket->bindSocket();
    mServerSocket->setSocketBlocking(false);
    mServerSocket->listenToClient(0);
    setupSignals();
    while(isSIGTERM() == false)
    {
        try
        {
            max = prepareListening();
            //memset(&ts, 0, sizeof(struct timespec));
            pret = pselect(max+1, &mReaders, &mWriters, &mExceptions, NULL, &mSignalMask);
            error_code = errno;
            if(pret == 0)
            {
                // Timeout occured, but we are not interested in that for now.
                // Currently this shouldn't happen anyway.
                continue;
            }
            processRequest(pret, error_code);
        }
        catch (SocketException &excp)
        {
            removeClientConnection(findClientConnection(excp.getTCPSocket()));
        }
    } // while sigTERM

BaseSocket.cpp:
#ifdef UNIX
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <sys/ioctl.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <errno.h>
#endif
#include "support/exceptions/socket_exception.h"
#include "support/logging/simple_log.h"
#include "support/network/base_socket.h"
using namespace std;
BaseSocket::BaseSocket(void)
{
    mSocketId = -1;
    mSendBufferSize = MAX_SEND_LEN;
}
BaseSocket::BaseSocket(int pNumber)
{
    mSocketId = -1;
    mPortNumber = pNumber;
    mBlocking = 1;
    mSendBufferSize = MAX_SEND_LEN;
    try
    {
        if ((mSocketId = ::socket(AF_INET, SOCK_STREAM, 0)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::CONSTRUCTOR, errno, "Socket", "unix: error in socket constructor", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
    /*
     set the initial address of client that shall be communicated with to
     any address as long as they are using the same port number.
     The clientAddr structure is used in the future for storing the actual
     address of client applications with which communication is going
     to start
     */
    mClientAddr.sin_family = AF_INET;
    mClientAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    mClientAddr.sin_port = htons(mPortNumber);
    updateSendBufferSize(MAX_SEND_LEN);
}
void BaseSocket::updateSendBufferSize(int nNewSize)
{
    mSendBufferSize = getSendBufferSize();
    if(mSendBufferSize > nNewSize)
        mSendBufferSize = nNewSize;
}
BaseSocket::~BaseSocket(void)
{
    close();
}
void BaseSocket::setSocketId(int socketFd)
{
    mSocketId = socketFd;
}
int BaseSocket::getSocketId()
{
    return mSocketId;
}
// returns the port number
int BaseSocket::getPortNumber()
{
    return mPortNumber;
}
void BaseSocket::setDebug(int debugToggle)
{
    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_DEBUG, (char *) &debugToggle, sizeof(debugToggle)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set debug", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}
void BaseSocket::setReuseAddr(int reuseToggle)
{
    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_REUSEADDR, (char *) &reuseToggle,
              sizeof(reuseToggle)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set reuse address", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}
void BaseSocket::setKeepAlive(int aliveToggle)
{
    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_KEEPALIVE, (char *) &aliveToggle,
              sizeof(aliveToggle)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set keep alive", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}
void BaseSocket::setLingerSeconds(int seconds)
{
    struct linger lingerOption;
    if (seconds > 0)
    {
        lingerOption.l_linger = seconds;
        lingerOption.l_onoff = 1;
    }
    else
        lingerOption.l_onoff = 0;
    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (char *) &lingerOption,
              sizeof(struct linger)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set linger seconds", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}
void BaseSocket::setLingerOnOff(bool lingerOn)
{
    struct linger lingerOption;
    if (lingerOn)
        lingerOption.l_onoff = 1;
    else
        lingerOption.l_onoff = 0;
    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (char *) &lingerOption,
              sizeof(struct linger)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set linger on/off", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}
void BaseSocket::setSendBufferSize(int sendBufSize)
{
    if (setsockopt(mSocketId, SOL_SOCKET, SO_SNDBUF, (char *) &sendBufSize, sizeof(sendBufSize)) == -1)
    {
#ifdef UNIX
        throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error send buffer size", __FILE__, __LINE__);
#endif
    }
    updateSendBufferSize(sendBufSize);
}
void BaseSocket::setReceiveBufferSize(int receiveBufSize)
{
    if (setsockopt(mSocketId, SOL_SOCKET, SO_RCVBUF, (char *) &receiveBufSize, sizeof(receiveBufSize)) == -1)
    {
#ifdef UNIX
        throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set receive buffer size", __FILE__, __LINE__);
#endif
    }
}
int BaseSocket::isSocketBlocking()
{
    return mBlocking;
}
void BaseSocket::setSocketBlocking(int blockingToggle)
{
    if (blockingToggle)
    {
        if (isSocketBlocking())
            return;
        else
            mBlocking = 1;
    }
    else
    {
        if (!isSocketBlocking())
            return;
        else
            mBlocking = 0;
    }
    try
    {
#ifdef UNIX
        int flags;
        if (-1 == (flags = fcntl(mSocketId, F_GETFL, 0)))
            flags = 0;
        if(mBlocking)
            fcntl(mSocketId, F_SETFL, flags & (~O_NONBLOCK));
        else
            fcntl(mSocketId, F_SETFL, flags | O_NONBLOCK);
        /*if (ioctl(socketId, FIONBIO, (char *) &blocking) == -1)
        {
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set socke blocking");
        }*/
#endif
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}
int BaseSocket::getDebug()
{
    int myOption;
    int myOptionLen = sizeof(myOption);
    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_DEBUG, (void *) &myOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get debug", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }
    return myOption;
}
int BaseSocket::getReuseAddr()
{
    int myOption;
    int myOptionLen = sizeof(myOption);
    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_REUSEADDR, (void *) &myOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get reuse address", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }
    return myOption;
}
int BaseSocket::getKeepAlive()
{
    int myOption;
    int myOptionLen = sizeof(myOption);
    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_KEEPALIVE, (void *) &myOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get keep alive", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }
    return myOption;
}
int BaseSocket::getLingerSeconds()
{
    struct linger lingerOption;
    int myOptionLen = sizeof(struct linger);
    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (void *) &lingerOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get linger seconds", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }
    return lingerOption.l_linger;
}
bool BaseSocket::getLingerOnOff()
{
    struct linger lingerOption;
    int myOptionLen = sizeof(struct linger);
    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (void *) &lingerOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get linger on/off", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
    if (lingerOption.l_onoff == 1)
        return true;
    else
        return false;
}
int BaseSocket::getSendBufferSize()
{
    int sendBuf;
    int myOptionLen = sizeof(sendBuf);
    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_SNDBUF, (void *)&sendBuf, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get send buffer size", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }
    return sendBuf;
}
int BaseSocket::getReceiveBufferSize()
{
    int rcvBuf;
    int myOptionLen = sizeof(rcvBuf);
    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_RCVBUF, (void *) &rcvBuf, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get receive buffer size", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }
    return rcvBuf;
}
ostream &operator<<(ostream& io, BaseSocket& s)
{
    string flagStr = "";
    io << endl;
    io << "Summary of socket settings:" << endl;
    io << "   Socket Id:     " << s.getSocketId() << endl;
    io << "   port #:        " << s.getPortNumber() << endl;
    io << "   debug:         " << (flagStr = s.getDebug() ? "true" : "false")
          << endl;
    io << "   reuse addr:    " << (flagStr = s.getReuseAddr() ? "true" : "false")
          << endl;
    io << "   keep alive:    " << (flagStr = s.getKeepAlive() ? "true" : "false")
          << endl;
    io << "   send buf size: " << s.getSendBufferSize() << endl;
    io << "   recv bug size: " << s.getReceiveBufferSize() << endl;
    io << "   blocking:      "
          << (flagStr = s.isSocketBlocking() ? "true" : "false") << endl;
    io << "   linger on:     "
          << (flagStr = s.getLingerOnOff() ? "true" : "false") << endl;
    io << "   linger seconds: " << s.getLingerSeconds() << endl;
    io << endl;
    return io;
}
void BaseSocket::close(void)
{
    ::close(mSocketId);
}
TCPSocket.cpp:

#ifdef UNIX
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <sys/ioctl.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <errno.h>
#endif
#include <sstream>
#include "support/logging/log.h"
#include "support/exceptions/socket_exception.h"
#include "support/logging/simple_log.h"
#include "support/network/tcp_socket.h"
using namespace std;
const int MSG_HEADER_LEN = 6;
TCPSocket::TCPSocket()
: BaseSocket()
{
}
TCPSocket::TCPSocket(int portId)
: BaseSocket(portId)
{
}
TCPSocket::~TCPSocket()
{
}
void TCPSocket::initialize()
{
}
void TCPSocket::bindSocket()
{
    try
    {
        if (bind(mSocketId, (struct sockaddr *) &mClientAddr, sizeof(struct sockaddr_in)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::BIND, 0, "Socket", "unix: error calling bind()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}
void TCPSocket::connectToServer(string& serverNameOrAddr, hostType hType)
{
    /* 
     when this method is called, a client socket has been built already,
     so we have the socketId and portNumber ready.
     a HostInfo instance is created, no matter how the server's name is
     given (such as www.yuchen.net) or the server's address is given (such
     as 169.56.32.35), we can use this HostInfo instance to get the
     IP address of the server
     */
    HostInfo serverInfo(serverNameOrAddr, hType);
    // Store the IP address and socket port number
    struct sockaddr_in serverAddress;
    serverAddress.sin_family = AF_INET;
    serverAddress.sin_addr.s_addr = inet_addr(
          serverInfo.getHostIPAddress().c_str());
    serverAddress.sin_port = htons(mPortNumber);
    // Connect to the given address
    try
    {
        if (connect(mSocketId, (struct sockaddr *) &serverAddress, sizeof(serverAddress)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::CONNECT, 0, "Socket", "unix: error calling connect()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}
TCPSocket *TCPSocket::acceptClient(string& clientHost)
{
    int newSocket; // the new socket file descriptor returned by the accept system call
    // the length of the client's address
    int clientAddressLen = sizeof(struct sockaddr_in);
    struct sockaddr_in clientAddress;    // Address of the client that sent data
    // Accepts a new client connection and stores its socket file descriptor
    try
    {
        if ((newSocket = accept(mSocketId, (struct sockaddr *) &clientAddress, &clientAddressLen)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::ACCEPT, 0, "Socket", "unix: error calling accept()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return NULL;
    }
    // Get the host name given the address
    char *sAddress = inet_ntoa((struct in_addr) clientAddress.sin_addr);
    HostInfo clientInfo(sAddress, ADDRESS);
    clientHost += clientInfo.getHostName();
    // Create and return the new TCPSocket object
    TCPSocket* retSocket = new TCPSocket();
    retSocket->setSocketId(newSocket);
    return retSocket;
}
void TCPSocket::listenToClient(int totalNumPorts)
{
    try
    {
        if (listen(mSocketId, totalNumPorts) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::LISTEN, 0, "Socket", "unix: error calling listen()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}
ostream &operator<<(ostream &oStream, const TCPSocket &oSocket)
{
    oStream << oSocket.mSocketId;
    return oStream;
}
int TCPSocket::send(SocketMessage const &oBuffer, int nSize)
{
    int numBytes;  // the number of bytes sent
    int error = errno;
    if(nSize == -1)
        nSize = oBuffer.size();
    if((unsigned int)nSize > oBuffer.size())
    {
        std::stringstream ss;
        ss << "Invalid Buffersize! Requested: " << (unsigned int)nSize << " Provided: " << oBuffer.size();
        std::string s;
        ss >> s;
        FILE_LOG(logERROR) << s;
        throw SocketException(this, SocketException::SEND, 0, "Socket", s, __FILE__, __LINE__);
    }
    // Sends the message to the connected host
    try
    {
        FILE_LOG(logDEBUG4) << "Sending on socket: "<< mSocketId << " bytes:" << nSize;
        numBytes = ::send(mSocketId, &oBuffer[0], nSize, 0);
        error = errno;
        FILE_LOG(logDEBUG4) << "Sent on socket: "<< mSocketId << " bytes:" << nSize << " errno: " << error;
        if(numBytes == -1)
        {
#ifdef UNIX
            if(error == EAGAIN || error == EWOULDBLOCK)
            {
                return -1;
            }
            else
                throw SocketException(this, SocketException::SEND, error, "Socket", "unix: error calling send()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
    return numBytes;
}
int TCPSocket::receive(SocketMessage &oMessage, int nBytes)
{
    int max = getSendBufferSize();
    if(nBytes != -1 && nBytes < max)
        max = nBytes;
    SocketMessage sb(max);
    int rcv_bytes = 0;
    int total = 0;
    int error = 0;
    while(1)
    {
        rcv_bytes = ::recv(getSocketId(), &sb[0], sb.size(), 0);
        error = errno;
        FILE_LOG(logDEBUG4) << "Received on socket: " << getSocketId() << " bytes: " << rcv_bytes << "  expected:" << sb.size() << " total: " << total << " errno: " << error;
        if(rcv_bytes == -1)
        {
            if(error == EAGAIN || error == EWOULDBLOCK)
                return total;
            throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection error!", __FILE__, __LINE__);
        }
        // Socket has been closed.
        if(rcv_bytes == 0)
            return total;
        total += rcv_bytes;
        oMessage.insert(oMessage.end(), sb.begin(), sb.begin()+rcv_bytes);
    }
    return total;
}
void TCPSocket::close(void)
{
    BaseSocket::close();
}

你确定纳格尔算法没有在这里踢?如果您没有通过设置TCP_NODELAY套接字选项来禁用它,那么您的数据可能不会被发送,直到有一定数量的数据(MSS)可用。

先问几个问题:
为什么使用非阻塞I/O?
-你显然知道消息应该是30字节长,为什么你要求32768字节?

如果你使用非阻塞I/O,那么socket不仅仅是调用recv。对于阻塞I/O,每个错误都是真正的错误。使用非阻塞I/O,您必须处理讨厌的EAGAIN/EWOULDBLOCK错误。恢复是可能的,但是当您配置设备使用非阻塞I/o时,您要负责恢复。

正如名字(EAGAIN)所暗示的那样,得到这个错误结果意味着你需要再试一次,最好是在等待一段时间之后。一个简单但不是很好的方法是等待sleep(或usleepnanosleep)一段时间。这样做的问题是,您可能等待的时间太长,或者等待的时间不够长。等待时间过长,系统可能会变得无响应,或者发送者可能会消失。等待时间过短会使计算机在特权模式和非特权模式之间切换。

等待这样一个事件的最好方法是使用基于事件的模式,但不幸的是,这些模式是不可移植的。一个可移植的方案是使用selectpoll。您可以使selectpoll无限期等待,也可以指定超时。我发现poll更容易使用,特别是当只涉及一个文件描述符时。不过这是个人偏好。其他人则认为select更容易使用。

调用recv返回的字节数是不可预测的。许多消息被分成几个部分接收,因此如果您还没有获得整个消息,则有必要再次调用recv。但是,您的代码似乎没有办法确定是否已接收到整个消息。并且,你的代码返回EWOULDBLOCK,但是EWOULDBLOCK是套接字操作的正常部分。它不表示错误或消息完成。