boost::asio::套筒螺纹安全性

boost::asio::socket thread safety

本文关键字:安全性 asio boost      更新时间:2023-10-16

(这是我最初问题的简化版本)

我有几个线程可以写入boost asio套接字。这似乎很有效,没有任何问题。

文档中说共享套接字不是线程安全的(在这里,从底部往下看),所以我想知道是否应该用互斥锁之类的东西来保护套接字。

这个问题坚持认为保护是必要的,但没有给出如何做到这一点的建议

我最初问题的所有答案都坚持认为我所做的事情很危险,大多数人都敦促我用async_writes或更复杂的东西来代替我的写作。然而,我不愿意这样做,因为这会使已经在工作的代码复杂化,而且没有一个回答者让我相信他们知道他们在说什么——他们似乎和我一样阅读了相同的文档,并在猜测,就像我一样。

因此,我编写了一个简单的程序来测试从两个线程向共享套接字的写操作。

这是服务器,它只需写出从客户端接收到的任何内容

int main()
{
boost::asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 3001));
tcp::socket socket(io_service);
acceptor.accept(socket);
for (;;)
{
char mybuffer[1256];
int len = socket.read_some(boost::asio::buffer(mybuffer,1256));
mybuffer[len] = '';
std::cout << mybuffer;
std::cout.flush();
}
return 0;
}

这是客户端,它创建两个线程,以最快的速度写入共享套接字

boost::asio::ip::tcp::socket * psocket;
void speaker1()
{
string msg("speaker1: hello, server, how are you running?n");
for( int k = 0; k < 1000; k++ ) {
boost::asio::write(
*psocket,boost::asio::buffer(msg,msg.length()));
}
}
void speaker2()
{
string msg("speaker2: hello, server, how are you running?n");
for( int k = 0; k < 1000; k++ ) {
boost::asio::write(
*psocket,boost::asio::buffer(msg,msg.length()));
}
}
int main(int argc, char* argv[])
{
boost::asio::io_service io_service;
// connect to server
tcp::resolver resolver(io_service);
tcp::resolver::query query("localhost", "3001");
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
tcp::resolver::iterator end;
psocket = new tcp::socket(io_service);
boost::system::error_code error = boost::asio::error::host_not_found;
while (error && endpoint_iterator != end)
{
psocket->close();
psocket->connect(*endpoint_iterator++, error);
}

boost::thread t1( speaker1 );
boost::thread t2( speaker2 );
Sleep(50000);
}

这很管用!据我所知,非常完美。客户端不会崩溃。消息到达服务器时没有出现乱码。它们通常交替到达,每条线一个。有时一个线程在另一个线程之前收到两到三条消息,但我不认为这是一个问题,只要没有乱码并且所有消息都到达。

我的结论是:从某种理论意义上讲,套接字可能不是线程安全的,但很难让它失败,我不会担心它

在重新计算async_write的代码后,我现在确信任何写操作都是线程安全的,当且仅当数据包大小小于时

default_max_transfer_size = 65536;

发生的情况是,一旦调用async_write,就会在同一线程中调用async_write_some。池中任何调用某种形式io_service::run的线程都将继续为该写操作调用async_write_some,直到它完成。

如果必须多次调用(数据包大于65536),这些async_write_some调用可以并且将交织。

ASIO不会像您所期望的那样对套接字的写入进行排队,一个接一个地完成。为了确保线程交错安全写入,请考虑以下代码:

void my_connection::async_serialized_write(
boost::shared_ptr<transmission> outpacket) {
m_tx_mutex.lock();
bool in_progress = !m_pending_transmissions.empty();
m_pending_transmissions.push(outpacket);
if (!in_progress) {
if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {
boost::asio::async_write(m_socket,
m_pending_transmissions.front()->scatter_buffers,
boost::asio::transfer_all(),
boost::bind(&my_connection::handle_async_serialized_write,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
} else { // Send single buffer
boost::asio::async_write(m_socket,
boost::asio::buffer(
m_pending_transmissions.front()->buffer_references.front(),                          m_pending_transmissions.front()->num_bytes_left),
boost::asio::transfer_all(),
boost::bind(
&my_connection::handle_async_serialized_write,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
m_tx_mutex.unlock();
}
void my_connection::handle_async_serialized_write(
const boost::system::error_code& e, size_t bytes_transferred) {
if (!e) {
boost::shared_ptr<transmission> transmission;
m_tx_mutex.lock();
transmission = m_pending_transmissions.front();
m_pending_transmissions.pop();
if (!m_pending_transmissions.empty()) {
if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {
boost::asio::async_write(m_socket,
m_pending_transmissions.front()->scatter_buffers,
boost::asio::transfer_exactly(
m_pending_transmissions.front()->num_bytes_left),
boost::bind(
&chreosis_connection::handle_async_serialized_write,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
} else { // Send single buffer
boost::asio::async_write(m_socket,
boost::asio::buffer(
m_pending_transmissions.front()->buffer_references.front(),
m_pending_transmissions.front()->num_bytes_left),
boost::asio::transfer_all(),
boost::bind(
&my_connection::handle_async_serialized_write,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
m_tx_mutex.unlock();
transmission->handler(e, bytes_transferred, transmission);
} else {
MYLOG_ERROR(
m_connection_oid.toString() << " " << "handle_async_serialized_write: " << e.message());
stop(connection_stop_reasons::stop_async_handler_error);
}
}

这基本上形成了一个队列,用于一次发送一个数据包。async_write仅在第一次写入成功后调用,然后调用第一次写入的原始处理程序。

如果asio让每个套接字/流的写队列都是自动的,这会更容易。

对线程不安全的异步处理程序使用boost::asio::io_service::strand

串被定义为事件的严格顺序调用处理程序(即没有并发调用)。线束的使用允许在不需要显式锁定(例如使用互斥锁)。

计时器教程可能是让你的头脑围绕线束的最简单方法。

听起来这个问题可以归结为:

async_write_some()在来自两个不同线程的单个套接字上同时调用时会发生什么

我相信这正是线程不安全的操作。这些缓冲区在线路上的输出顺序是未定义的,它们甚至可能是交错的。特别是如果您使用方便函数async_write(),因为它是通过对下面的async_write_some()的一系列调用来实现的,直到发送完整个缓冲区。在这种情况下,从两个线程发送的每个片段可以随机交织。

防止你遇到这种情况的唯一方法是建立你的程序来避免这种情况。

一种方法是编写一个应用层发送缓冲区,由单个线程负责将其推送到套接字上。这样,您就可以只保护发送缓冲区本身。不过,请记住,简单的std::vector是不起作用的,因为在末尾添加字节可能最终会重新分配它,可能是在有一个优秀的async_write_some()引用它的时候。相反,使用缓冲区的链表并利用asio的分散/聚集功能可能是个好主意。

理解ASIO的关键是要认识到,无论哪个线程调用异步方法,完成处理程序都只能在调用了io_service.run()的线程的上下文中运行。如果您只在一个线程中调用了io_service.run(),那么所有的完成处理程序都将在该线程的上下文中串行执行。如果您在多个线程中调用了io_service.run(),那么完成处理程序将在其中一个线程的上下文中执行。您可以将其视为一个线程池,其中池中的线程是在同一个io_service对象上调用io_service.run()的线程。

如果有多个线程调用io_service.run(),那么可以通过将完成处理程序放在strand中来强制序列化它们。

要回答问题的最后一部分,请致电boost::async_write()。这将把写操作分派到已调用io_service.run()的线程上,并在写操作完成时调用完成处理程序。如果您需要序列化此操作,那么它会稍微复杂一点,您应该阅读此处关于链的文档。

首先考虑套接字是一个流,并且没有针对并发读取和/或写入进行内部保护。有三个不同的考虑因素。

  1. 访问同一套接字的函数的并发执行
  2. 并发执行包含同一套接字的委托
  3. 写入同一套接字的委托的交错执行

聊天示例是异步的,但不是并发的。io_service是从单个线程运行,使所有聊天客户端操作都不并发。换句话说,它避免了所有这些问题。即使是async_write也必须在内部完成发送消息的所有部分,然后才能进行任何其他工作,从而避免交错问题。

只有当前正在为io_service调用run()、run_one()、poll()或poll_one()的任何重载的线程才会调用句柄。

通过将工作发布到单线程io_service,其他线程可以通过排队io_services中的工作来安全地避免并发和阻塞。然而,如果您的场景阻止您缓冲给定套接字的所有工作,事情就会变得更加复杂。您可能需要阻塞套接字通信(但不阻塞线程),而不是无休止地排队工作。此外,工作队列可能很难管理,因为它完全不透明。

如果io_service运行多个线程,您仍然可以轻松避免上述问题,但您只能从其他读或写的处理程序调用读或写(以及在启动时)。这将对套接字的所有访问进行排序,同时保持非阻塞状态。安全性源于这样一个事实,即模式在任何给定时间只使用一个线程。但是从一个独立的线程发布工作是有问题的——即使你不介意缓冲它

strand是一个asio类,它以确保非并发调用的方式将工作发布到io_service。然而,使用strand来调用async_read和/或async_write只能解决这三个问题中的第一个。这些函数在内部将工作发布到套接字的io_service。如果该服务正在运行多个线程,则可以同时执行工作。

那么,对于给定的套接字,如何安全地同时调用async_read和/或async_write呢?

  1. 对于并发调用程序,第一个问题可以用互斥或串来解决,如果你不想缓冲工作,可以使用前者,如果你想缓冲,可以使用后者。这在函数调用期间保护了套接字,但对其他问题没有任何作用。

  2. 第二个问题似乎是最困难的,因为很难看到从两个函数异步执行的代码内部发生了什么。async函数都将工作发布到套接字的io_service。

来自升压插座来源:

/**
* This constructor creates a stream socket without opening it. The socket
* needs to be opened and then connected or accepted before data can be sent
* or received on it.
*
* @param io_service The io_service object that the stream socket will use to
* dispatch handlers for any asynchronous operations performed on the socket.
*/
explicit basic_stream_socket(boost::asio::io_service& io_service)
: basic_socket<Protocol, StreamSocketService>(io_service)
{
}

从io_service::run()

/**
* The run() function blocks until all work has finished and there are no
* more handlers to be dispatched, or until the io_service has been stopped.
*
* Multiple threads may call the run() function to set up a pool of threads
* from which the io_service may execute handlers. All threads that are
* waiting in the pool are equivalent and the io_service may choose any one
* of them to invoke a handler.
*
* ...
*/
BOOST_ASIO_DECL std::size_t run();

因此,如果给套接字多个线程,它别无选择,只能使用多个线程——尽管这不是线程安全的。避免这个问题的唯一方法(除了替换套接字实现之外)是只给套接字一个线程来处理。对于一个单独的套接字,这就是你想要的(所以不要麻烦跑去写替换)。

  1. 第三个问题可以通过使用(不同的)互斥来解决,该互斥在async_write之前被锁定,传递到完成处理程序中,然后在该点解锁。这将阻止任何调用方开始写入,直到前面写入的所有部分都完成为止

请注意,async_write帖子在队列中工作——这就是它几乎可以立即返回的方式。如果你投入了太多的工作,你可能不得不处理一些后果。尽管使用单个io_service线程作为套接字,但可能有任意数量的线程通过对async_write的并发或非并发调用来发布工作。

另一方面,async_read非常简单。不存在交错问题,您只需从上一次调用的处理程序循环返回即可。您可能希望也可能不希望将生成的工作分派给另一个线程或队列,但如果在完成处理程序线程上执行,则只会阻塞单线程套接字上的所有读写操作。

更新

我对套接字流的底层实现(对于一个平台)进行了更多的挖掘。这种情况似乎是套接字在调用线程上一致地执行平台套接字调用,而不是发布到io_service的委托。换句话说,尽管async_read和async_write看起来会立即返回,但它们实际上在返回之前执行了所有套接字操作。只有处理程序被发布到io_service。我审查过的示例代码既没有记录也没有公开这一点,但假设这是有保证的行为,它会显著影响上面的第二个问题。

假设发布到io_service的工作没有包含套接字操作,那么就没有必要将io_seervice限制为单个线程。然而,它确实加强了防止异步函数并发执行的重要性。因此,例如,如果按照聊天示例,但却向io_service添加了另一个线程,就会出现问题。通过在函数处理程序中执行异步函数调用,可以同时执行函数。这将需要一个互斥对象,或者所有异步函数调用都被转发以在一个链上执行。

更新2

关于第三个问题(交织),如果数据大小超过65536字节,则工作将在async_write内部分解并分部分发送。但重要的是要理解,如果io_service中有多个线程,那么第一个线程之外的工作块将被发布到不同的线程。在调用完成处理程序之前,这一切都发生在async_write函数内部。该实现创建自己的中间完成处理程序,并使用它们执行除第一个套接字操作之外的所有操作。

这意味着,如果有多个io_service线程要发布的数据超过64kb(默认情况下,这可能会有所不同),那么围绕async_write调用(互斥或串)的任何保护都将保护套接字。因此,在这种情况下,交错保护不仅对于交错安全是必要的,而且对于插座的螺纹安全也是必要的。我在调试器中验证了所有这些。

MUTEX选项

async_read和async_write函数在内部使用io_service,以便获得要在其上发布完成处理程序的线程,阻塞直到线程可用。这使得使用互斥锁来保护它们是危险的。当使用互斥锁来保护这些函数时,当线程对锁进行备份时,将发生死锁,使io_service处于饥饿状态。考虑到在使用多线程io_service发送>64k时没有其他方法来保护async_write,在这种情况下,它有效地将我们锁定在一个线程中——这当然解决了并发问题。

根据2008年11月的boost 1.37 asio更新,包括写入在内的某些同步操作"现在是线程安全的",允许"在操作系统支持的情况下,在单个套接字上进行并发同步操作"boost 1.37.0历史。这似乎支持了您所看到的内容,但过于简单化的"共享对象:不安全"子句仍然存在于ip::tcp::socket的boost文档中。

对一篇旧帖子的另一条评论。。。

我认为asio::async_write()过载的asio文档中的关键句子如下:

此操作是根据对流的async_write_some函数的零个或多个调用来实现的,称为组合操作。程序必须确保流在该操作完成之前不执行其他写入操作(如async_write、流的async_wwrite_some函数或任何其他执行写入的组合操作)。

据我所知,本文记录了上述许多答案中的假设:如果多个线程执行io_context.run(),则来自对asio::async_write的调用的数据可能会交错。

也许这对某人有帮助;-)

这取决于您是否从多个线程访问同一个套接字对象。假设您有两个线程运行相同的io_service::run()函数。

例如,如果您同时进行阅读和写作,或者可能执行取消操作来自其他线程。那就不安全了。

但是,如果您的协议一次只执行一个操作。

  1. 如果只有一个线程运行io_service,那么就没有问题。如果你想从其他线程在套接字上执行一些东西,你可以用调用io_service::post()对套接字执行此操作的处理程序,以便在同一线程中执行
  2. 如果您有几个线程在执行io_service::run,并且您试图同时执行操作——比如说取消和读取操作,那么您应该使用链。Boost.Asio文档中有一个关于这方面的教程

我一直在运行大量的测试,但还没能破坏asio。即使没有锁定任何互斥。

尽管如此,我还是建议您使用async_readasync_write,并在每一个调用周围使用一个互斥对象。

我认为唯一的缺点是,如果有多个线程调用io_service::run,那么可以同时调用完成处理程序。

就我而言,这不是一个问题。这是我的测试代码:

#include <boost/thread.hpp>
#include <boost/date_time.hpp>
#include <boost/asio.hpp>
#include <vector>
using namespace std;
char databuffer[256];
vector<boost::asio::const_buffer> scatter_buffer;
boost::mutex my_test_mutex;
void my_test_func(boost::asio::ip::tcp::socket* socket, boost::asio::io_service *io) {
while(1) {
boost::this_thread::sleep(boost::posix_time::microsec(rand()%1000));
//my_test_mutex.lock(); // It would be safer 
socket->async_send(scatter_buffer, boost::bind(&mycallback));
//my_test_mutex.unlock(); // It would be safer
}
}
int main(int argc, char **argv) {
for(int i = 0; i < 256; ++i)
databuffer[i] = i;
for(int i = 0; i < 4*90; ++i)
scatter_buffer.push_back(boost::asio::buffer(databuffer));
boost::asio::io_service my_test_ioservice;
boost::asio::ip::tcp::socket my_test_socket(my_test_ioservice);
boost::asio::ip::tcp::resolver my_test_tcp_resolver(my_test_ioservice);
boost::asio::ip::tcp::resolver::query  my_test_tcp_query("192.168.1.10", "40000");
boost::asio::ip::tcp::resolver::iterator my_test_tcp_iterator = my_test_tcp_resolver.resolve(my_test_tcp_query);
boost::asio::connect(my_test_socket, my_test_tcp_iterator);
for (size_t i = 0; i < 8; ++i) {
boost::shared_ptr<boost::thread> thread(
new boost::thread(my_test_func, &my_test_socket, &my_test_ioservice));
}
while(1) {
my_test_ioservice.run_one();
boost::this_thread::sleep(boost::posix_time::microsec(rand()%1000));
}
return 0;

}

这是我在python中的临时服务器:

import socket
def main():
mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
mysocket.bind((socket.gethostname(), 40000))
mysocket.listen(1)
while 1:
(clientsocket, address) = mysocket.accept()
print("Connection from: " + str(address))
i = 0
count = 0
while i == ord(clientsocket.recv(1)):
i += 1
i %= 256
count+=1
if count % 1000 == 0:
print(count/1000)
print("Error!")
return 0
if __name__ == '__main__':
main()

请注意,运行此代码可能会导致您的计算机崩溃。