C++非阻塞套接字选择发送太慢

C++ non blocking socket select send too slow?

本文关键字:选择 套接字 C++      更新时间:2023-10-16

我有一个程序,它维护一个"流式"套接字列表。这些套接字被配置为非阻塞套接字。

目前,我已经使用了一个列表来存储这些流套接字。我有一些数据需要发送到所有这些流式套接字,因此我使用迭代器在流式套接字列表中循环,并调用下面的send_TCP_NB函数:

问题是,我自己的程序缓冲区在发送到此send_TCP_NB函数之前存储数据,其可用大小缓慢减小,这表明发送速度慢于将数据放入程序缓冲区的速度。程序缓冲区的速率大约是每秒1000个数据。每个数据都很小,大约100个字节。

因此,我不确定我的send_TCP_NB函数是否有效或正确?

int send_TCP_NB(int cs, char data[], int data_length) {
    bool sent = false;
    FD_ZERO(&write_flags);      // initialize the writer socket set
    FD_SET(cs, &write_flags);   // set the write notification for the socket based on the current state of the buffer
    int status;
    int err;
    struct timeval waitd;       // set the time limit for waiting
    waitd.tv_sec = 0;
    waitd.tv_usec = 1000;
    err = select(cs+1, NULL, &write_flags, NULL, &waitd);
    if(err==0)
    {
        // time limit expired
        printf("Time limit expired!n");
        return 0;   // send failed
    }
    else
    {
        while(!sent)
        {
                if(FD_ISSET(cs, &write_flags))
                {
                     FD_CLR(cs, &write_flags);
                     status = send(cs, data, data_length, 0);
                     sent = true;
                }
         }
         int nError = WSAGetLastError();
         if(nError != WSAEWOULDBLOCK && nError != 0)
         {      
              printf("Error sending non blocking datan");
              return 0;
         }
         else
         {
              if(nError == WSAEWOULDBLOCK)
              {
                    printf("%dn", nError);
              }
              return 1;
          }
       }
}

有一件事会有所帮助,那就是如果你仔细思考这个函数应该做什么。它实际做的可能不是你想要的,而且有一些糟糕的功能。

我注意到它的主要功能是:

  1. 修改某些全局状态
  2. 等待(最多1毫秒)写入缓冲区有一些空闲空间
  3. 如果缓冲区仍然满,则中止
  4. 在套接字上发送1个或多个字节(忽略发送的字节数)
  5. 如果出现错误(包括发送决定它将被阻止,尽管之前进行了检查),请获取其值。否则,获得一个随机误差值
  6. 根据获得的值,可能会将某些内容打印到屏幕上
  7. 根据错误值,返回0或1

对以下几点的评论:

  1. 为什么write_flags是全球性的
  2. 你真的打算阻止这个功能吗
  3. 这可能很好
  4. 你肯定关心发送了多少数据
  5. 我在文档中没有看到任何内容表明,如果send成功,这将为零

如果你明确了这个函数的实际意图,那么确保这个函数真正实现这个意图可能会容易得多。

上面说

我有一些数据需要发送到所有这些流式传输套接字

您到底需要什么

如果您的需要是在继续之前必须发送数据,那么使用非阻塞写入是不合适的*,因为无论如何您都必须等到可以写入数据为止。

如果您的需求是数据必须在未来某个时候发送,那么您的解决方案就缺少了一个非常关键的部分:您需要为每个套接字创建一个缓冲区,用于保存需要发送的数据,然后您需要定期调用一个检查套接字的函数,以尝试写入任何可能的内容。如果您为后一个目的生成一个新线程,这就是select非常有用的事情,因为您可以制作新线程块,直到它能够编写一些东西。然而,如果您不生成新线程,只是周期性地从主线程调用一个函数进行检查,那么您就不需要麻烦了。(只需将你能写的内容写入所有内容,即使是零字节)

*:至少,这是一个非常不成熟的优化。在某些边缘情况下,通过智能地使用非阻塞写入,您可以获得稍高的性能,但如果您不了解这些边缘情况是什么,以及非阻塞写入将如何提供帮助,那么猜测它不太可能获得好的结果。

编辑:正如另一个答案所暗示的那样,这是操作系统无论如何都擅长的事情。如果您发现套接字缓冲区已满,则不要尝试编写自己的代码来管理此问题,而是增大系统缓冲区。如果仍在填充,您应该认真考虑程序无论如何都需要阻止的想法,这样它停止发送数据的速度就会比另一端处理数据的速度更快。也就是说,只需对所有数据使用普通的阻止send

一些一般建议:

  • 请记住,您正在倍增数据。因此,如果输入1 MB/s,则输出N MB/s的N个客户端。你确定你的网卡可以接受它吗?数据包越小,情况就越糟,你会得到更多的一般开销。你可能想考虑广播。

  • 您正在使用非阻塞套接字,但在它们不空闲时进行阻塞。如果你想不阻塞,最好在套接字没有准备好的情况下立即丢弃数据包。

  • 最好一次"选择"多个套接字。除了所有可用的套接字之外,请执行您正在执行的所有操作。您将对每个"就绪"套接字进行写入,然后在存在未就绪的套接字时再次重复。通过这种方式,您将首先处理可用的套接字,然后在某些情况下,繁忙的套接字将变为可用的。

  • CCD_ 5循环是无用的并且可能有缺陷。由于您只检查了一个套接字,因此FD_ISSET将始终为真。在FD_CLR 之后再次检查FD_ISSET是错误的

  • 请记住,您的操作系统有一些用于套接字的内部缓冲区,并且有办法扩展它们(不过,在Linux上要获得大值并不容易,您需要以root身份进行一些配置)。

  • 有一些套接字库可能会比您在合理的时间内实现的库工作得更好(我知道的是boost::asiozmq)。

  • 如果您需要自己实现它(例如,因为zmq有自己的数据包格式),请考虑使用线程池库。

编辑:

  • 睡1毫秒可能是个坏主意。你的线程可能会被取消调度,在你再次获得一些CPU时间之前,它将花费更多的时间

这是一种可怕的做事方式。select没有任何作用,只是浪费时间。如果send是非阻塞的,它可能会在部分发送时损坏数据。如果它正在阻塞,您仍然会浪费任意多的时间等待一个接收器。

您需要选择一个合理的I/O策略。这里有一个:将所有套接字设置为非阻塞。当您需要将数据发送到套接字时,只需调用write即可。如果所有的数据都写了,那就太好了。如果没有,请保存未发送的数据部分,以便以后使用,并将套接字添加到写入集中。当您无事可做时,请致电select。如果您在写入集中的任何套接字上都命中了,请从保存的内容中写入尽可能多的字节。如果您写入了所有这些套接字,请从写入集中删除该套接字。

(如果您需要写入已在写入集中的数据,只需将数据添加到要发送的已保存数据中即可。如果缓冲了太多数据,则可能需要关闭连接。)

一个更好的想法可能是使用一个已经做了所有这些事情的库。Boost::asio是个不错的例子。

您在调用send()之前调用了select()。反过来做。仅当send()报告WSAEWOULDBLOCK时调用select(),例如:

int send_TCP_NB(int cs, char data[], int data_length)
{ 
    int status; 
    int err; 
    struct timeval waitd;
    char *data_ptr = data;
    while (data_length > 0)
    {
        status = send(cs, data_ptr, data_length, 0); 
        if (status > 0)
        {
            data_ptr += status;
            data_length -= status;
            continue;
        }
        err = WSAGetLastError();
        if (err != WSAEWOULDBLOCK)
        {
            printf("Error sending non blocking datan"); 
            return 0;   // send failed 
        }
        FD_ZERO(&write_flags);
        FD_SET(cs, &write_flags);   // set the write notification for the socket based on the current state of the buffer 
        waitd.tv_sec = 0; 
        waitd.tv_usec = 1000; 
        status = select(cs+1, NULL, &write_flags, NULL, &waitd); 
        if (status > 0) 
            continue;
        if (status == 0)
            printf("Time limit expired!n"); 
        else
            printf("Error waiting for time limit!n"); 
        return 0;   // send failed 
    }
    return 1; 
}