boost::asio -服务器之间的冲突,双向通信

boost::asio - collisions between servers, two-way communication

本文关键字:冲突 双向通信 服务器 asio boost 之间      更新时间:2023-10-16

我正在尝试使用ASIO编写一个允许服务器充当客户端的应用程序。例如:

我有3个服务器需要相互通信。在与网络中的其他服务器通信时,它们需要能够充当客户机。所有3台服务器都可以通过unix域套接字或带SSL的TCP/IP服务请求。

数据流如下:

1)一个独立的客户端连接到服务器A(通过unix域套接字)并发送一个请求。

2)服务器尝试响应请求,但如果它不能,它发起一个TCP/IP连接到服务器B(现在服务器a作为服务器B的客户端),并将请求转发给它。服务器还"污染"了数据包,告诉服务器B不要将消息转发给另一个服务器,这样就不会产生无限循环。

3)如果服务器B可以处理请求,则响应服务器A。

4)如果服务器B可以处理请求,服务器A将响应返回给独立客户端。

5)如果服务器B不能处理请求,服务器A尝试联系服务器C、服务器D、服务器E等

这工作…直到服务器B有自己的独立客户端,在服务器A试图联系服务器B的同时试图联系服务器A。这会产生冲突,两个服务器将无限期地等待从另一个服务器获得响应。使用截止日期计时器可以避免无限期的等待,但这并不能解决问题。

正确的方法是什么?

编辑:我将服务器分成2类(服务器和PeerProxy)在单独的线程中运行,但我仍然得到死锁。

以下是我所做的。我将Unix侦听器和TCP侦听器拆分为Server和PeerProxy类。服务器有自己的io_service, PeerProxy也有自己的io_service。当服务器启动时,它也启动PeerProxy在第二个线程中运行(所以它不会阻塞服务器的执行)。现在的数据流是这样的:

独立客户端->服务器A(无法应答)-> PeerProxy B ->服务器B(已应答)-> PeerProxy B ->服务器A ->独立客户端

同样的问题,当服务器B的独立客户端转到PeerProxy a的同时,服务器a转到PeerProxy B时,出现死锁

您应该在服务器中异步处理每个请求,即将处理分为单独的执行线程。这样服务器就可以保持响应,也就是说,当它们与其他客户端或服务器通信时,它们可以对新的请求做出反应。

因此,在您的情况下,当两个客户端1和2向服务器A和B发送请求时,只有另一个服务器可以响应(或不响应),这两个服务器可能看起来像这样:

Server A:                                   Server B:
Thread 0  | Thread 1     | Thread 2         Thread 0  | Thread 1     | Thread 2
listen...                                   listen...
-> req 1                                    -> req 2
listen... | handle req 1                    listen... | handle req 2
listen... | forward to B                    listen... | forward to A
-> req B  | wait...                         -> req A  | wait...
listen... | wait...        | handle req B   listen... | wait...      | reject req A  
listen... | -> B: rejected | answer req B   listen... | wait...       
listen... | forward to C                    listen... | -> A: answer       
listen... | -> C: answer                    listen... | req 2 done       
listen... | req 1 done                      listen...
listen...                                   listen...

这里,每个服务器的线程0除了侦听传入请求和处理这些请求的其他线程旋转之外没有其他目的。其他线程每个只处理一个请求,要么回答它,要么将它转发给所有服务器,如果它被"污染",则拒绝它。

注意:这些线程根本不必是真正的线程对象。它们可以是ASIO异步调用序列,也可以是一些线程框架(如TBB)中的轻量级任务。

更新:我将为您发布一些示例伪代码,我将如何使用Boost.Asio实现服务器。为此,我想介绍一个我认为对理解Asio的执行很有用的概念:您可以将其视为一个状态机,其中async_*调用是状态转换,而处理程序是状态。通常每个处理程序执行一个async_*调用,这意味着您从一个状态切换到另一个状态。如果在一个处理程序中有多个后续的async_*调用,这意味着该处理程序正在生成执行的辅助线程。如果一个处理程序没有调用任何async_*函数,相应的执行线程结束。

现在开始实现。

线程0像典型的Asio教程所展示的那样,创建一个套接字并侦听传入的连接。唯一的问题是,在每个新的客户端连接上都会产生一个新的执行线程(读取:处理程序序列):
accept_handler(client connection) {
async_read(client_request, request_handler) //spawn new thread of execution
async_accept(next client connection, accept_handler) //transition to accept_handler
}

线程N:以request_handler:

request_handler(client_request) {
if canProcess 
async_send_answer(client, done_handler) //transition to done_handler
else  //relay request to first server on list
async_connect(first server on list, connect_handler) //transition to connect_handler
}  

done_handler通常会记录成功的应答,而调用另一个async_*函数,这意味着与客户端的连接将被关闭,执行线程结束。

将请求发送到其他服务器的处理程序序列是典型的连接-发送-接收-断开连接序列:

connect_handler          -- async_send(request) ---------> send_handler
send_handler             -- async_read(answer) ----------> read_handler
read_handler (no answer) -- async_connect(next server) --> connect_handler

如果从其中一个服务器接收到应答,或者列表中没有更多的服务器,则循环结束:

read_handler (answer ok)       -- async_send_answer(client) --> done_handler
read_handler (no more servers) -- async_send_fail(client) ----> done_handler

这是一个简单的竞争条件。您可能希望实现某种原子锁变量、信号或标志,因此,如果一台服务器即将向另一台服务器发送请求,它将从那一刻起拒绝来自其他服务器的任何传入请求。我可能会使用std::atomic来实现它。