TCP/IP IOCP接收到的数据有时已损坏-Windows上的Visual C++

TCP/IP IOCP received data sometimes corrupt - Visual C++ on Windows

本文关键字:已损坏 -Windows 上的 C++ Visual 数据 IP IOCP TCP      更新时间:2023-10-16

我正在编写一个简单的测试ICOP客户端和服务器,以确保我正确使用API,并确保服务器正确接收客户端发送的数据。我已经包含了这个问题的所有代码。

这就是我遇到一些问题的地方,接收缓冲区内的数据有时似乎已损坏(损坏的原因是缓冲区内有时数据块可能出现无序或丢失)。需要明确的是,这是单个接收缓冲区中的数据,我的意思不是因为线程调度问题而导致多个缓冲区之间的无序。我之前在这里发布了一个与此相关的问题。然而,我在获得一个合适的代码示例方面做了更多的工作,所以我发布了一个新的问题,并将链接到这个问题。我希望其他人能够运行这个代码,并经历同样的奇怪行为。

测试代码

测试应用程序可以在客户端和服务器两种模式下运行。运行服务器,它开始侦听,运行客户端并连接到服务器,一旦连接好,就会以允许的速度向服务器抛出数据。然后,服务器验证在调用WSARecv之后从GetQueuedCompleteStatus返回的每个缓冲区中的数据。每次WSASend完成时,我都会将结构的OVERLAPPED部分清零,并使用原始数据缓冲区再次调用WSASend。

客户端发送的每个数据缓冲区都是一个字节序列,这些字节一个接一个地递增,直到指定的最大值。我不发送完整的0..255范围,以防大小整齐地以倍数放入数据包,并以某种方式隐藏了问题,因此在我的示例中,代码字节的范围从0..250。对于构建的每个发送缓冲区,我重复该模式numberOfGroups次。

这种格式应该意味着我可以有多个WSARecv未完成,然后完全独立于任何其他缓冲区验证返回缓冲区中的数据,这意味着不需要同步或重建顺序。即,我可以从第一个字节开始,并验证它们一个接一个地递增到最大值,然后重置为0。一旦我的测试没有问题,我就可以进行更复杂的测试,对接收到的缓冲区进行排序,并验证更复杂的数据。

您可以在命令行上指定可以同时有多少个未完成的WSASend和WSARecv调用。当有两个或更多未完成的WSARecv呼叫时,这个问题似乎更频繁地发生。使用1,它可以运行相当长的一段时间,然后偶尔检测到问题。

我一直在Windows7上使用Visual Studio 2010 C++进行测试。

客户端和服务器中同时调用的次数似乎会产生影响。两者都使用2似乎比某些组合更容易产生损坏的数据。

套接字和IOCP似乎需要大量的样板代码才能启动并运行一个非常基本的客户端和服务器应用程序。实际接收缓冲区的代码只有几行,涉及到调用WSARecv和处理GetQueuedCompleteStatus中已完成的调用。

此代码调用WSARecv

void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
DWORD numberOfBytesTransferred = 0;
DWORD flags = 0;
if (overlapped == nullptr)
{
overlapped = new TestOverlapped(receiveBufferSize);
overlapped->connection = this;
}
else
{
overlapped->reset();
}
overlapped->operation = soRecv;
auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}

当WSARecv调用完成时,它们由工作线程处理——我已经删除了与从这个片段接收数据无关的行

void IOCPWorker::execute()
{
bool quit = false;
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = NULL;
PTestOverlapped overlapped = nullptr;
while (!quit)
{
auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
if (queueResult)
{
switch (overlapped->operation)
{
case soRecv:
{
IOCPConnection *connection = overlapped->connection;
connection->onRecv(overlapped, numberOfBytesTransferred); // This method validates the received data
connection->postRecv(overlapped);
overlapped = nullptr;
break;
}
default:;
}
}
}
}

调用connection->onRecv是我验证数据的地方。这里有什么明显的问题吗?

我已经包含了完整的代码供参考,如果你有冒险精神,应该编译。


完整参考源

服务器示例侦听端口3000,最多有2个未完成的WSARecv调用

> IOCPTest.exe server 3000 2

客户端示例连接到端口3000上的127.0.0.1,最多有2个未完成的WSASend调用

> IOCPTest.exe client 127.0.0.1 3000 2

该程序由少量类组成

IOCPConnectionManager

这个类处理侦听连接,还启动工作线程。

IOCPConnection

只是跟踪SOCKET和一些处理异步调用的方法。当WSARecv返回并验证缓冲区内的数据时,将调用IOCPConnection::onRecv。它只是打印一条消息,并在发现数据不按顺序时返回。

IOCPWorker

工作线程。IOCPWorker::execute()是调用GetQueuedCompleteStatus的位置。

TestOverlapped

所需的重叠结构。

您还需要为链接器包含Ws2_32.lib和Mswsock.lib。

主cpp文件

/************************************************************************
*                                                                       *
*  Test IOCP Client and Server - David Shaw                             *
*                                                                       *
*  There is limited error handling here and it assumes ideal conditions *
*  Some allocated objects are not freed at the end, this is a test only *
*                                                                       *
************************************************************************/
#include "stdafx.h"
#include <iostream>
#include <string>
#include "IOCPTest.h"
#include <Windows.h>
void printUse()
{
std::cout << "Invalid arguments" << std::endl;
std::cout << "This test app has very limited error handling or memory management" << std::endl;
std::cout << "Run as client or server (run the server first) e.g." << std::endl << std::endl;
std::cout << "To run as server listening on port 3000 with 2 pending receives:" << std::endl;
std::cout << "> IOCPTester.exe server 3000 2" << std::endl << std::endl;
std::cout << "To run as client connected to 127.0.0.1 on port 3000 with 2 pending sends:" << std::endl;
std::cout << "> IOCPTester.exe client 127.0.0.1 3000 2" << std::endl << std::endl;
std::cout << "Hit enter to exit" << std::endl;
std::cin.ignore();
}
int main(int argc, char *argv[])
{
if (argc < 4)
{
printUse();
return 0;
}
std::string mode(argv[1]);
if ((mode.compare("client") != 0) && (mode.compare("server") != 0))
{
printUse();
return 0;
}
IOCPTest::IOCPConnectionManager *manager = new IOCPTest::IOCPConnectionManager();
bool server = mode.compare("server") == 0;
if (server)
{
std::string listenPort(argv[2]);
std::string postedReceiveCount(argv[3]);
manager->listenPort = atoi(listenPort.c_str());
manager->postedReceiveCount = atoi(postedReceiveCount.c_str());
manager->postedSendCount = 1; // Not really used in this mode
manager->startListening();
}
else
{
if (argc < 5)
{
printUse();
return 0;
}
std::string host(argv[2]);
std::string port(argv[3]);
std::string postedSendCount(argv[4]);
manager->postedReceiveCount = 1; // Not really used in this mode
manager->postedSendCount = atoi(postedSendCount.c_str());
IOCPTest::IOCPConnection *connection = manager->createConnection();
connection->host = host;
connection->port = atoi(port.c_str());
connection->connect();
}
std::cout << "Hit enter to exit" << std::endl;
std::cin.ignore();
}

IOCPTest.h

/************************************************************************
*                                                                       *
*  Test IOCP Client and Server - David Shaw                             *
*                                                                       *
*  There is limited error handling here and it assumes ideal conditions *
*  std::cout might not be the best approach in a multithreaded          *
*  environment but this is just a simple test app.                      *
*  Some allocated objects are not cleaned up at the end either, but     *
*  again this is just a test.                                           *
*                                                                       *
************************************************************************/
#ifndef IOCPTestH
#define IOCPTestH
#endif
#include <WinSock2.h> // Include before as otherwise Windows.h includes and causes issues
#include <Windows.h>
#include <string>
namespace IOCPTest
{
class IOCPConnection;
enum IOCPSocketOperation
{
soUnknown,
soAccept,
soConnect,
soDisconnect,
soSend,
soRecv,
soQuit
};
struct TestOverlapped
{
OVERLAPPED overlapped;
WSABUF buffer;
IOCPSocketOperation operation;
IOCPConnection *connection;
bool resend; // Set this to keep sending the same data over and over
TestOverlapped(int bufferSize);
~TestOverlapped();
void reset();
};
typedef TestOverlapped *PTestOverlapped;
class IOCPConnectionManager
{
public:
static const int NUMACCEPTS = 5;
WSADATA wsaData;
HANDLE iocp;
SOCKET listenSocket;
USHORT listenPort;
int postedReceiveCount;
int postedSendCount;
void startListening();
void postAcceptEx();
IOCPConnection *createConnection();
IOCPConnectionManager();
};
class IOCPConnection
{
public:
SOCKET socket;
IOCPConnectionManager *manager;
std::string host;
USHORT port;
void onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void postRecv(PTestOverlapped overlapped = nullptr);
void onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void send(PTestOverlapped overlapped);
void onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void connect();
};
class IOCPWorker
{
public:
HANDLE threadHandle;
DWORD threadId;
IOCPConnectionManager *manager;
IOCPWorker(bool suspended);
void start();
void execute();
};
}

IOCPTest.cpp

#include "stdafx.h"
#include "IOCPTest.h"
#include <iostream>
#include <Mswsock.h>
#include <WS2tcpip.h>
#include <sstream>
namespace IOCPTest
{
LPFN_ACCEPTEX fnAcceptEx = nullptr;
LPFN_CONNECTEX fnConnectEx = nullptr;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidConnectEx = WSAID_CONNECTEX;
const byte maxByteExpected = 250;
const int numberOfGroups = 4096;
const int receiveBufferSize = 0x100000;
BOOL AcceptEx
(
SOCKET sListenSocket,
SOCKET sAcceptSocket,
PVOID lpOutputBuffer,
DWORD dwReceiveDataLength,
DWORD dwLocalAddressLength,
DWORD dwRemoteAddressLength,
LPDWORD lpdwBytesReceived,
LPOVERLAPPED lpOverlapped
)
{
if (fnAcceptEx == nullptr)
{
DWORD dwBytes;
int result = WSAIoctl(sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof (GuidAcceptEx), &fnAcceptEx, sizeof(fnAcceptEx), &dwBytes, NULL, NULL);
if (result != 0)
{
std::cerr << "Error calling WSAIoctl for AcceptEx" << std::endl;
return false;
}
}
return fnAcceptEx(sListenSocket, sAcceptSocket, lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, lpOverlapped);
}
BOOL ConnectEx(
SOCKET s,
const struct sockaddr FAR *name,
int namelen,
PVOID lpSendBuffer,
DWORD dwSendDataLength,
LPDWORD lpdwBytesSent,
LPOVERLAPPED lpOverlapped
)
{
if (fnConnectEx == nullptr)
{
DWORD dwBytes;
int result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidConnectEx, sizeof (GuidConnectEx), &fnConnectEx, sizeof(fnConnectEx), &dwBytes, NULL, NULL);
if (result != 0)
{
std::cerr << "Error calling WSAIoctl for ConnectEx" << std::endl;
return false;
}
}
return fnConnectEx(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, lpOverlapped);
}
// TestOverlapped
TestOverlapped::TestOverlapped(int bufferSize):
overlapped(), 
operation(soUnknown),
connection(nullptr),
buffer(),
resend(false)
{
if (bufferSize > 0)
{
buffer.len = bufferSize;
buffer.buf = (CHAR*) malloc(bufferSize);
}
}
TestOverlapped::~TestOverlapped()
{
if (buffer.buf != nullptr)
{
free(buffer.buf);
}
}
void TestOverlapped::reset()
{
overlapped = OVERLAPPED();
}
// IOCPConnectionManager
IOCPConnectionManager::IOCPConnectionManager():
wsaData(),
listenSocket(0),
listenPort(0),
postedReceiveCount(1)
{
WSAStartup(WINSOCK_VERSION, &wsaData);
iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
SYSTEM_INFO systemInfo = SYSTEM_INFO();
GetSystemInfo(&systemInfo);
for (decltype(systemInfo.dwNumberOfProcessors) i = 0; i < systemInfo.dwNumberOfProcessors; i++)
{
IOCPWorker* worker = new IOCPWorker(true);
worker->manager = this;
worker->start();
}
}
void IOCPConnectionManager::startListening()
{
listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
CreateIoCompletionPort((HANDLE)listenSocket, iocp, ULONG_PTR(this), 0);
sockaddr_in localAddress = sockaddr_in();
localAddress.sin_family = AF_INET;
localAddress.sin_addr.s_addr = INADDR_ANY; // Listen on all addresses
localAddress.sin_port = htons(listenPort);
if (bind(listenSocket, (SOCKADDR*) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
{
std::cerr << "Error in binding listening socket" << std::endl;
}
if (listen(listenSocket, SOMAXCONN) == 0)
{
std::cout << "Listening on port " << listenPort << std::endl;
}
for (int i = 0; i < NUMACCEPTS; i++)
{
postAcceptEx();
}
}
void IOCPConnectionManager::postAcceptEx()
{
SOCKET acceptSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
IOCPConnection *connection = new IOCPConnection();
connection->manager = this;
connection->socket = acceptSocket;
CreateIoCompletionPort((HANDLE) acceptSocket, iocp, ULONG_PTR(connection), 0); // The thread count is ignored in this call when just associating the socket
PTestOverlapped overlapped = new TestOverlapped(2 * (sizeof(sockaddr_in) + 16)); // As specified in documentation
overlapped->operation = soAccept;
overlapped->connection = connection;
DWORD byesReceived = 0;
int result = IOCPTest::AcceptEx
(
listenSocket,
acceptSocket,
overlapped->buffer.buf,
0, // Size of initial receiving buffer, excluding the space at the end for the two addressed
sizeof(sockaddr_in) + 16, // Sizes as specified in the Winsock 2.2 API documentation
sizeof(sockaddr_in) + 16, // Sizes as specified in the Winsock 2.2 API documentation
&byesReceived,
(LPOVERLAPPED) overlapped
);
if (!result)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
std::cerr << "Error calling AcceptEx. Returned errorCode = " << errorCode << std::endl;
}
}
}
IOCPConnection *IOCPConnectionManager::createConnection()
{
IOCPConnection *connection = new IOCPConnection();
connection->manager = this;
return connection;
}
// IOCPConnection
void IOCPConnection::onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
manager->postAcceptEx(); // Replace this accept
auto returnCode = setsockopt(socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (const char *)&manager->listenSocket, sizeof(manager->listenSocket));
if (returnCode == SOCKET_ERROR)
{
std::cerr << "SetSockOpt in OnAcceptEx returned SOCKET_ERROR" << std::endl;
}
std::cout << "Connection Accepted" << std::endl;
for (int i = 0; i < manager->postedReceiveCount; ++i)
{
postRecv();
}
}
void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
DWORD numberOfBytesTransferred = 0;
DWORD flags = 0;
if (overlapped == nullptr)
{
overlapped = new TestOverlapped(receiveBufferSize);
overlapped->connection = this;
}
else
{
overlapped->reset();
}
overlapped->operation = soRecv;
auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}
void IOCPConnection::onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
if (numberOfBytesTransferred > 0)
{
byte *data = (byte *)overlapped->buffer.buf;
if (data[0] > maxByteExpected)
{
std::cerr << "Byte greater than max expected found. Max Expected: " << maxByteExpected << "; Found: " << data[0] << std::endl;
return;
}
byte next = (data[0] == maxByteExpected)?0:data[0] + 1;
for (decltype(numberOfBytesTransferred) i = 1; i < numberOfBytesTransferred; ++i)
{
if (data[i] != next)
{
// Not really the best solution for writing data out from multiple threads. Test app only.
std::cerr << "Invalid data. Expected: " << (int)next << "; Got: " << (int)data[i] << std::endl;
return;
}
else if (next == maxByteExpected)
{
next = 0;
}
else
{
++next;
}
}
//std::cout << "Valid buffer processed" << std::endl;
}
}
void IOCPConnection::onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
for (int i = 0; i < manager->postedSendCount; ++i)
{
// Construct a sequence of incremented byte values 0..maxByteExpected repeated numberOfGroups
PTestOverlapped sendOverlapped = new TestOverlapped((maxByteExpected + 1) * numberOfGroups);
sendOverlapped->connection = this;
for (int j = 0; j < numberOfGroups; ++j)
{
for (byte k = 0; k <= maxByteExpected; ++k)
{
((byte *)sendOverlapped->buffer.buf)[(j * (maxByteExpected + 1)) + (int)k] = k;
}
}
sendOverlapped->resend = true; // Repeat sending this data
send(sendOverlapped);
}
}
void IOCPConnection::send(PTestOverlapped overlapped)
{
overlapped->reset();
overlapped->operation = soSend;
DWORD bytesSent = 0;
DWORD flags = 0;
if (WSASend(socket, &overlapped->buffer, 1, &bytesSent, flags, (LPWSAOVERLAPPED) overlapped, nullptr) == SOCKET_ERROR)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
std::cerr << "Error calling WSASend. Returned errorCode = " << errorCode << std::endl;
}
}
}
void IOCPConnection::onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
}
void IOCPConnection::connect()
{
socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socket == INVALID_SOCKET)
{
std::cerr << "Error calling socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) in IOCPConnection::connect()" << std::endl;
return;
}
CreateIoCompletionPort((HANDLE)socket, manager->iocp, ULONG_PTR(this), 0); // The thread count is ignored in this call when just associating the socket
sockaddr_in localAddress = sockaddr_in();
localAddress.sin_family = AF_INET;
localAddress.sin_addr.s_addr = INADDR_ANY;
localAddress.sin_port = 0;
if (bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
{
std::cerr << "Error calling bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress) in IOCPConnection::connect()" << std::endl;
return;
}
addrinfo hints = addrinfo();
addrinfo *remoteAddress = nullptr;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
std::stringstream ss;
ss << port;
//std::cout << ss.str() << std::endl;
if (getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) != 0)
{
std::cerr << "Error calling getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) in IOCPConnection::connect()" << std::endl;
return;
}
TestOverlapped *overlapped = new TestOverlapped(0);
overlapped->connection = this;
overlapped->operation = soConnect;
BOOL result = IOCPTest::ConnectEx
(
socket,
remoteAddress->ai_addr,
remoteAddress->ai_addrlen,
nullptr,
0,
nullptr,
LPOVERLAPPED(overlapped)
);
if (result == FALSE)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
//std::cerr << "Error calling ConnectEx. You'll need to add some more code if you want to know why :)" << std::endl;
std::cerr << "Error calling ConnectEx. Returned errorCode = " << errorCode << std::endl;
}
}
freeaddrinfo(remoteAddress);
}
// IOCPWorker
DWORD WINAPI IOCPWorkerThreadProc(LPVOID lpParam)
{
((IOCPWorker*)lpParam)->execute();
return 0;
}
IOCPWorker::IOCPWorker(bool suspended)
{
threadHandle = CreateThread(NULL, 0, IOCPWorkerThreadProc, this, (suspended)?CREATE_SUSPENDED:0, &threadId);
}
void IOCPWorker::start()
{
ResumeThread(threadHandle);
}
void IOCPWorker::execute()
{
//std::cout << "TMVIOCPWorker::execute()" << std::endl;
bool quit = false;
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = NULL;
PTestOverlapped overlapped = nullptr;
while (!quit)
{
auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
if (queueResult)
{
switch (overlapped->operation)
{
case soAccept:
{
IOCPConnection *connection = overlapped->connection;
connection->onAcceptEx(overlapped, numberOfBytesTransferred);
delete overlapped;
overlapped = nullptr;
break;
}
case soConnect:
{
std::cout << "ConnectEx returned" << std::endl;
IOCPConnection *connection = overlapped->connection;
connection->onConnect(overlapped, numberOfBytesTransferred); // This method validates the received data
delete overlapped;
overlapped = nullptr;
break;
}
case soRecv:
{
//std::cout << "Received Data: " << numberOfBytesTransferred << std::endl;
IOCPConnection *connection = overlapped->connection;
connection->onRecv(overlapped, numberOfBytesTransferred); // This method validates the received data
overlapped->reset();
connection->postRecv(overlapped);
overlapped = nullptr;
break;
}
case soSend:
{
IOCPConnection *connection = overlapped->connection;
connection->onSent(overlapped, numberOfBytesTransferred);
// Send the same data over and over
std::cout << "Resending buffer" << std::endl;
if (overlapped->resend)
{
connection->send(overlapped);
}
else
{
delete overlapped;
}
overlapped = nullptr;
break;
}
default:;
}
}
}
}
}

接收到的大多数缓冲区都是正确的,但当运行套接字的2个接收和2个发送缓冲区时,我仍然有很多这样的滚动:

Invalid data. Expected: 169; Got: 123
Invalid data. Expected: 114; Got: 89
Invalid data. Expected: 89; Got: 156
Invalid data. Expected: 206; Got: 227
Invalid data. Expected: 125; Got: 54
Invalid data. Expected: 25; Got: 0
Invalid data. Expected: 58; Got: 146
Invalid data. Expected: 33; Got: 167
Invalid data. Expected: 212; Got: 233
Invalid data. Expected: 111; Got: 86
Invalid data. Expected: 86; Got: 153
Invalid data. Expected: 190; Got: 165
Invalid data. Expected: 175; Got: 150
Invalid data. Expected: 150; Got: 217
Invalid data. Expected: 91; Got: 112
Invalid data. Expected: 95; Got: 162
Invalid data. Expected: 207; Got: 182
Invalid data. Expected: 222; Got: 243
Invalid data. Expected: 126; Got: 101
Invalid data. Expected: 157; Got: 132
Invalid data. Expected: 160; Got: 89
Invalid data. Expected: 205; Got: 180
Invalid data. Expected: 113; Got: 134
Invalid data. Expected: 45; Got: 20
Invalid data. Expected: 113; Got: 201
Invalid data. Expected: 64; Got: 198
Invalid data. Expected: 115; Got: 182
Invalid data. Expected: 140; Got: 115

我希望这只是我做错的一件简单的事情。我在发送之前和接收时对数据缓冲区进行了相同的验证,以确保我没有在那里做一些愚蠢的事情,但它通过了检查。我使用IOCP用另一种语言而不是编写了一个服务器,它似乎正确地接收了数据。我还用另一种语言编写了一个客户端,IOCP服务器似乎也能检测到这种情况下的损坏。但也就是说,客户端和服务器可能都存在问题。我很感激任何人愿意花时间在这件事上。

好吧,我可能发现了你的问题。如果你看一看你收到的数据,所有的字节都是有序的,但突然按顺序跳了起来,就好像被另一个调用打断了一样。现在,从关于WSASend和WSARecv:的MSDN文档

如果使用I/O完成端口,请注意,对WSASend的调用顺序也是填充缓冲区的顺序。不应该从不同的线程同时在同一个套接字上调用WSASend,因为这可能会导致不可预测的缓冲区顺序。

如果使用I/O完成端口,请注意,对WSARecv的调用顺序也是填充缓冲区的顺序。不应该从不同的线程同时在同一个套接字上调用WSARecv,因为这可能会导致不可预测的缓冲区顺序。

就是这样。我现在真的不是你想要的好方法,但你所做的可能不是它应该被使用的方式。

你在真实的网络上尝试过吗?环回接口是一个特殊的电路,可能会有不同的行为,但它仍然是未定义的行为,所以你不应该依赖它。

测试了有问题的代码后,在单个套接字上对WSARecv的多个并发调用似乎会导致传递给完成处理程序的缓冲区中的数据损坏。确保每个连接一次只发出一个WSARecv调用的锁将解决此问题。

这与WSARecv的当前MSDN文档一致。

如果使用I/O完成端口,请注意,对WSARecv的调用顺序也是填充缓冲区的顺序。不应该从不同的线程同时在同一个套接字上调用WSARecv,因为这可能会导致不可预测的缓冲区顺序。

尽管我个人认为文档可能会更清楚,因为它意味着"缓冲区的填充顺序"可能是一个问题——这是众所周知的,也有文档记录,但没有提到入站数据流实际上可能在缓冲区之间不可预测地传播这一事实。


这对我来说很有趣,因为我从未知道这是一个问题,15年来我也从未见过:)然而,我依赖于对多个WSARecv完成进行排序,以避免众所周知和有记录的线程调度问题影响您处理读完成的顺序,即使它们保证按照进入的顺序从IOCP中出来。我的排序需要在每个读取缓冲区中有一个序列号,因此我在序列号增量和对WSARecv的调用周围有一个锁。

考虑到从多个线程发出多个WSARecv并成功地重新创建入站数据流是不可能的,除非你能以某种方式从完成中确定发出WSARecv的顺序,否则我看不出这实际上是TCP套接字的一个现实问题。然而,它可能会给UDP带来问题,因为除了防止这个问题之外,不需要排序,也不需要锁定。虽然我认为我从未意识到我见过它,但我认为这可能是我参与的一个系统上的问题。。。

我需要对WSASend端进行更多的测试,但我没有理由认为这比WSARecv调用更可能是线程安全的。啊,好吧,你每天都学到一些新东西。。。

我在这里发过博客。

我认为不要多次发布接收在任何时候只为一个套接字制作一个接收到的缓冲区。

处理完所有数据后,再次调用WSARecv以获取更多数据。