epoll:失去一些EPOLLOUT事件

epoll: losing some EPOLLOUT events?

本文关键字:EPOLLOUT 事件 失去 epoll      更新时间:2023-10-16

这就是我的服务器的样子:

-WorkerThread:

  • 调用epoll_wait,接受连接,设置fd非阻塞(EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP)
  • 在EPOLLIN事件上调用recv直到EAGAIN,并将所有数据推送到全局RecvBuffer(pthread_mutex同步)
  • on EPOLLOUT事件:访问全局SendBuffer,如果当前就绪fd有数据要发送,则执行此操作(在while循环中,直到EAGAIN或直到发送所有数据;当发送整个数据包时,从SendBuffer弹出)

-IOThread

  • 从全局RecvBuffer获取数据,并对其进行处理
  • 通过首先尝试立即调用send来发送响应。如果没有发送所有数据,则将其余数据推送到全局SendBuffer,以便从WorkerThread发送)

问题是,服务器不会发送所有排队的数据(它们留在SendBuffer中),并且"未发送"的数据量会随着客户端数量的增加而增加。为了测试起见,我只使用了1个workersread和1个iothread,但如果我使用更多,似乎没有什么区别。访问全局缓冲区受pthread_mutex保护。此外,我的响应数据大小为130k字节(至少需要3个发送调用才能发送这么多数据)。另一边是使用阻塞套接字的windows客户端。

非常感谢!MJ

编辑:

是的,默认情况下,我正在等待EPOLLOUT事件,即使我没有什么要发送的。为了实现简单性和手册页指南,我这样做了。此外,我对它的不理解是这样的:

即使我当时"错过"了EPOLLOUT事件,我也不想发送任何东西,这也没问题,因为当我想发送数据时,我会调用send,直到EAGAIN和EPOLLOUT在未来(大多数时候)触发

现在我修改了代码以在IN/OUT事件之间切换:

接受时:

event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_ADD, infd, &event);

当所有数据都已发送时:

event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, events[i].data.fd, &event);

当我通过调用IOThread:中的send到达EAGAIN时

event.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, events[i].data.fd, &event);

我也有同样的行为。此外,我尝试删除EPOLLET标志,但没有任何变化

一个附带的问题:带有epoll_ctl_MOD标志的epoll_ctl是替换events成员,还是只是用给定的参数进行or运算?

EDIT3:更新IOThread函数以连续发送,直到发送完所有数据,或直到EAGAIN。即使我发送了所有数据,我也尝试发送,但大多数时候我在非套接字上得到了errno 88套接字操作

第4版:我修复了"发送代码"中的一些错误,所以我现在不会收到任何未发送的排队数据。。但是,我没有收到应有的数据:))当发送完成后,客户端立即调用recv时,我收到的"丢失"(未收到)数据量最高,并且随着客户端数量的增加而增加。当我在客户端上的send和recv调用之间设置2秒的延迟(阻止调用)时,我在服务器上几乎没有丢失数据,这取决于有多少客户端在运行(客户端测试代码包括简单的for循环,后面有1个send和1个recv调用)再次尝试使用和不使用ET模式。。下面是更新的WorkerThread函数,它负责接收数据。@Admins/Mods也许我现在应该打开新的主题,因为问题有点不同?

void CNetServer::WorkerThread(void* param)
{
    CNetServer* pNetServer =(CNetServer*)param;
    struct epoll_event event;
    struct epoll_event *events;
    int s = 0;
//  events = (epoll_event*)calloc (MAXEVENTS, sizeof event);
    while (1)
    {
        int n, i;
//      printf ("BLOCKING NOW! epoll_wait thread %dn",pthread_self());
        n = pNetServer->m_epollCtrl.Wait(-1);
//      printf ("epoll_wait thread %dn",pthread_self());
        pthread_mutex_lock(&g_mtx_WorkerThd);
        for (i = 0; i < n; i++)
        {
            if ((pNetServer->m_epollCtrl.Event(i)->events & EPOLLERR))
            {
                // An error has occured on this fd, or the socket is not ready for reading (why were we notified then?)
            //  g_SendBufferArray.RemoveAll( 0 );
                char szFileName[30] = {0};
                sprintf( (char*)szFileName,"fd_%d.txt",pNetServer->m_epollCtrl.Event(i)->data.fd );
                remove(szFileName);
            /*  printf( "nnn");
                printf( "tDATA LEFT COUNT:%dn",g_SendBufferArray.size());
                for (int k=0;k<g_SendBufferArray.size();k++)
                    printf( "tSD: %d DATA LEFT:%dn",g_SendBufferArray[i]->sd,g_SendBufferArray[i]->nBytesSent );
*/
            //  fprintf (stderr, "epoll errorn");
            //  fflush(stdout);
                close (pNetServer->m_epollCtrl.Event(i)->data.fd);
                continue;
            }
            else if (pNetServer->m_ListenSocket == pNetServer->m_epollCtrl.Event(i)->data.fd)
            {
                // We have a notification on the listening socket, which   means one or more incoming connections. 
                while (1)
                {
                    struct sockaddr in_addr;
                    socklen_t in_len;
                    int infd;
                    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
                    in_len = sizeof in_addr;
                    infd = accept (pNetServer->m_ListenSocket, &in_addr, &in_len);
                    if (infd == -1)
                    {
                        if ((errno == EAGAIN) ||
                            (errno == EWOULDBLOCK))
                        {
                            // We have processed all incoming connections.
                            break;
                        }
                        else
                        {
                            perror ("accept");
                            break;
                        }
                    }
                    s = getnameinfo (&in_addr, in_len,
                        hbuf, sizeof hbuf,
                        sbuf, sizeof sbuf,
                        NI_NUMERICHOST | NI_NUMERICSERV);
                    if (s == 0)
                    {
                        printf("Accepted connection on descriptor %d "
                            "(host=%s, port=%s) thread %dn", infd, hbuf, sbuf,pthread_self());
                    }
                    // Make the incoming socket non-blocking and add it to the list of fds to monitor.
                    CEpollCtrl::SetNonBlock(infd,true);
                    if ( !pNetServer->m_epollCtrl.Add( infd, EPOLLIN, NULL ))
                    {
                        perror ("epoll_ctl");
                        abort ();
                    }
                }
                continue;
            }
            if( (pNetServer->m_epollCtrl.Event(i)->events & EPOLLOUT) )
            {
                pNetServer->DoSend( pNetServer->m_epollCtrl.Event(i)->data.fd );
            }
            if( pNetServer->m_epollCtrl.Event(i)->events & EPOLLIN )
            {
                printf("EPOLLIN TRIGGERED FOR SD: %dn",pNetServer->m_epollCtrl.Event(i)->data.fd);
                // We have data on the fd waiting to be read. 
                int done = 0;
                ssize_t count = 0;
                char buf[512];
                while (1)
                {
                    count = read (pNetServer->m_epollCtrl.Event(i)->data.fd, buf, sizeof buf);
                    printf("recv sd %d size %d thread %dn",pNetServer->m_epollCtrl.Event(i)->data.fd,count,pthread_self());
                    if (count == -1)
                    {
                        // If errno == EAGAIN, that means we have read all data. So go back to the main loop.
                        if ( errno != EAGAIN )
                        {
                            perror ("read");
                            done = 1;
                        }
                        break;
                    }
                    else if (count == 0)
                    {
                        //connection is closed by peer.. do a cleanup and close
                        done = 1;
                        break;
                    }
                    else if (count > 0)
                    {
                        static int nDataCounter = 0;
                        nDataCounter+=count;
                        printf("RECVDDDDD %dn",nDataCounter);
                        CNetServer::s_pRecvContainer->OnData( pNetServer->m_epollCtrl.Event(i)->data.fd, buf, count );
                    }
                }
                if (done)
                {
                    printf ("Closed connection on descriptor %dn",pNetServer->m_epollCtrl.Event(i)->data.fd);
                    // Closing the descriptor will make epoll remove it from the set of descriptors which are monitored. 
                    close (pNetServer->m_epollCtrl.Event(i)->data.fd);
                }
            }
        }
//      
        pNetServer->IOThread( (void*)pNetServer );
        pthread_mutex_unlock(&g_mtx_WorkerThd);
    }
}
void CNetServer::IOThread(void* param)
{
    BYTEARRAY* pbPacket = new BYTEARRAY;
    int fd;
    struct epoll_event event;
    CNetServer* pNetServer =(CNetServer*)param;
    printf("IOThread startin' !n");
    for (;;)
    {
        bool bGotIt = CNetServer::s_pRecvContainer->GetPacket( pbPacket, &fd );
        if( bGotIt )
        {
            //process packet here
            printf("Got 'em packet yo !n");
            BYTE* re = new BYTE[128000];
            memset((void*)re,0xCC,128000);
            buffer_t* responsebuff = new buffer_t( fd, re, 128000 ) ;
            pthread_mutex_lock(&g_mtx_WorkerThd);
            while( 1 )
            {
                    int s;
                    int nSent = send( responsebuff->sd, ( responsebuff->pbBuffer + responsebuff->nBytesSent ),responsebuff->nSize - responsebuff->nBytesSent,0 );
                    printf ("IOT: Trying to send nSent: %d buffsize: %d n",nSent,responsebuff->nSize - responsebuff->nBytesSent);
                    if (nSent == -1)
                    {
                        if (errno == EAGAIN || errno == EWOULDBLOCK )
                        {
                                g_vSendBufferArray.push_back( *responsebuff );
                                printf ("IOT: now waiting for EPOLLOUTn");
                                event.data.fd = fd;
                                event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
                                s = epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, fd, &event);
                                break;
                                if (s == -1)
                                {
                                    perror ("epoll_ctl");
                                    abort ();
                                }
                        }
                        else
                        {
                            printf( "%dn",errno );
                            perror ("send");
                            break;
                        }
                        printf ("IOT: WOOOOTn");
                        break;
                    }
                    else if (nSent == responsebuff->nSize - responsebuff->nBytesSent)
                    {
                        printf ("IOT:all is sent! wOOhOOn");
                        responsebuff->sd = 0;
                        responsebuff->nBytesSent += nSent;
                        delete responsebuff;
                        break;
                    }
                    else if (nSent < responsebuff->nSize - responsebuff->nBytesSent)
                    {
                        printf ("IOT: partial send!n");
                        responsebuff->nBytesSent += nSent;
                    }
            }
            delete [] re;
            pthread_mutex_unlock(&g_mtx_WorkerThd);
        }
    }
}
  1. 停止使用EPOLLET。几乎不可能做到正确。

  2. 如果你没有什么要发送的,不要要求EPOLLOUT事件。

  3. 当您有数据要在连接上发送时,请遵循以下逻辑:

    A) 如果该连接的发送队列中已经有数据,只需添加新数据即可。你完了

    B)请尝试立即发送数据。如果你把它全部发送出去,你就完了

    C)将剩余数据保存在此连接的发送队列中。现在请求EPOLLOUT。