如何在C/C++中实现TCP连接池

How to do TCP connection pooling in C/C++

本文关键字:实现 TCP 连接 C++      更新时间:2023-10-16

我正在用C++设计一个分布式服务器/客户端系统,其中许多客户端通过TCP向许多服务器发送请求,服务器抛出一个线程来处理请求并发回其响应。在我的用例中,只有有限数量的客户端可以访问服务器,我需要非常高的性能。从客户端和服务器发送的数据都很小,但非常频繁。因此,创建一个连接并在使用后将其拆除是昂贵的。所以我想使用连接缓存来解决这个问题:一旦创建了连接,它就会存储在缓存中以备将来使用。(假设客户端的数量不会超过缓存的大小)。

我的问题是:

  1. 我看到有人说连接池是一种客户端技术。如果这个连接池只在客户端使用,那么它第一次连接到服务器并发送数据。这种建立连接的操作触发了服务器端的accept()函数,该函数返回一个套接字,用于从客户端接收。所以,当客户端想要使用(缓存中的)现有连接时,它不会建立新的连接,而只是发送数据。问题是,如果没有建立连接,谁会在服务器端触发accept()并抛出线程
  2. 如果连接池也需要在服务器端实现,我如何知道请求来自哪里?由于只有从accept()我才能获得客户端地址,但与此同时,accept(()已经为该请求创建了一个新的套接字,因此没有必要使用缓存连接

任何答复和建议都将不胜感激。或者任何人都可以给我一个连接池或连接缓存的例子?

我看到有人说连接池是一种客户端技术。。。如果没有建立连接,谁会在服务器端触发accept()并抛出线程?

首先,连接池不仅仅是一种客户端技术;这是一种连接模式技术。它适用于两种类型的对等体("服务器"answers"客户端")。

其次,不需要调用accept来启动线程。程序可以出于任何原因启动线程。。。在线程创建的大规模并行循环中,他们可以启动线程来启动更多的线程。(编辑:我们称之为"叉式炸弹"

最后,一个高效的线程池实现不会为每个客户端启动一个线程。每个线程通常占用512KB-4MB(计算堆栈空间和其他上下文信息),因此,如果有10000个客户端,每个客户端都占用了这么多,那么就会浪费大量内存。

我想这么做,但就是不知道如何在多线程的情况下做到这一点。

您不应该在此处使用多线程。。。至少,直到你有了一个使用单个线程的解决方案,并且你认为它不够快。目前你还没有这些信息;你只是在猜测,而猜测并不能保证优化。

在世纪之交,FTP服务器解决了C10K问题;他们能够在任何给定的时间处理10000个客户端,就像用户在FTP服务器上所做的那样浏览、下载或闲置他们不是通过使用线程,而是通过使用非阻塞和/或异步套接字和/或调用来解决这个问题。

为了澄清,这些web服务器在一个线程上处理了数千个连接!一种典型的方法是使用select,但我并不特别喜欢这种方法,因为它需要一系列相当丑陋的循环。我更喜欢对Windows使用ioctlsocket,对其他POSIX操作系统使用fcntl来将文件描述符设置为非阻塞模式,例如:

#ifdef WIN32
ioctlsocket(fd, FIONBIO, (u_long[]){1});
#else
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
#endif

此时,recvreadfd上操作时不会阻塞;如果没有可用的数据,他们会立即返回一个错误值,而不是等待数据到达。这意味着您可以在多个套接字上循环。

如果连接池也需要在服务器端实现,我如何知道请求来自哪里?

将客户端fd与其struct sockaddr_storage以及您需要存储的关于客户端的任何其他有状态信息一起存储在struct中,您可以根据自己的感受进行声明。如果最终达到4KB(这是一个相当大的struct,通常与它们需要的大小差不多),那么其中10000个将只占用大约40000KB(~40MB)。即使是今天的移动电话也应该不会有任何问题。考虑根据您的需要完成以下代码:

struct client {
    struct sockaddr_storage addr;
    socklen_t addr_len;
    int fd;
    /* Other stateful information */
};
#define BUFFER_SIZE 4096
#define CLIENT_COUNT 10000
int main(void) {
    int server;
    struct client client[CLIENT_COUNT] = { 0 };
    size_t client_count = 0;
    /* XXX: Perform usual bind/listen */
    #ifdef WIN32
    ioctlsocket(server, FIONBIO, (u_long[]){1});
    #else
    fcntl(server, F_SETFL, fcntl(server, F_GETFL, 0) | O_NONBLOCK);
    #endif
    for (;;) {
        /* Accept connection if possible */
        if (client_count < sizeof client / sizeof *client) {
            struct sockaddr_storage addr = { 0 };
            socklen_t addr_len = sizeof addr;
            int fd = accept(server, &addr, &addr_len);
            if (fd != -1) {
#               ifdef WIN32
                ioctlsocket(fd, FIONBIO, (u_long[]){1});
#               else
                fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
#               endif
                client[client_count++] = (struct client) { .addr = addr
                                                         , .addr_len = addr_len
                                                         , .fd = fd };
            }
        }
        /* Loop through clients */
        char buffer[BUFFER_SIZE];
        for (size_t index = 0; index < client_count; index++) {
            ssize_t bytes_recvd = recv(client[index].fd, buffer, sizeof buffer, 0);
#           ifdef WIN32
            int closed = bytes_recvd == 0
                      || (bytes_recvd < 0 && WSAGetLastError() == WSAEWOULDBLOCK);
#           else
            int closed = bytes_recvd == 0
                      || (bytes_recvd < 0 && errno == EAGAIN) || errno == EWOULDBLOCK;
#           endif
            if (closed) {
                close(client[index].fd);
                client_count--;
                memmove(client + index, client + index + 1, (client_count - index) * sizeof client);
                continue;
            }
            /* XXX: Process buffer[0..bytes_recvd-1] */
        }
        sleep(0); /* This is necessary to pass control back to the kernel,
                   * so it can queue more data for us to process
                   */
    }
}

假设您想在客户端上池连接,代码看起来会非常相似,只是显然不需要accept相关的代码。假设您有一个想要connectclient数组,您可以使用非阻塞连接调用一次执行所有连接,如下所示:

size_t index = 0, in_progress = 0;
for (;;) {
    if (client[index].fd == 0) {
        client[index].fd = socket(/* TODO */);
#       ifdef WIN32
        ioctlsocket(client[index].fd, FIONBIO, (u_long[]){1});
#       else
        fcntl(client[index].fd, F_SETFL, fcntl(client[index].fd, F_GETFL, 0) | O_NONBLOCK);
#       endif
    }
#   ifdef WIN32
    in_progress += connect(client[index].fd, (struct sockaddr *) &client[index].addr, client[index].addr_len) < 0
                && (WSAGetLastError() == WSAEALREADY
                ||  WSAGetLastError() == WSAEWOULDBLOCK
                ||  WSAGetLastError() == WSAEINVAL);
#   else
    in_progress += connect(client[index].fd, (struct sockaddr *) &client[index].addr, client[index].addr_len) < 0
                && (errno == EALREADY
                ||  errno == EINPROGRESS);
#   endif
    if (++index < sizeof client / sizeof *client) {
        continue;
    }
    index = 0;
    if (in_progress == 0) {
        break;
    }
    in_progress = 0;
}

至于优化,考虑到这应该能够处理10000个客户端,也许只需要一些小的调整,你不应该需要多个线程。

尽管如此,通过将mutex集合中的项与clients相关联,并在非阻塞套接字操作之前与无阻塞pthread_mutex_trylock相关联,上述循环可以适用于在多个线程中同时运行,同时处理同一组套接字。这为所有符合POSIX的平台提供了一个工作模型,无论是Windows、BSD还是Linux,但它并不是一个完美的最佳模型。为了实现最优性,我们必须进入异步世界,该世界因系统而异:

  • Windows使用带有回调的WSA*函数
  • BSD和Linux分别使用有点相似的kqueueepoll

编译前面提到的"非阻塞套接字操作"抽象可能是值得的,因为这两种异步机制在接口方面差异很大。与其他一切一样,不幸的是,我们必须编写抽象,以便在符合POSIX的系统上保持与Windows相关的代码的可读性。额外的好处是,这将允许我们将服务器处理(即accept和随后的任何内容)与客户端处理(即connect和随后的所有内容)混合在一起,因此我们的服务器循环可以变成客户端环圈(反之亦然)。