如何为IOCP构建工作线程逻辑
How to structure worker thread logic for IOCP
我正在创建一个客户端程序,该程序通过局域网与连接到我电脑的设备进行通信。
我的程序和设备之间的典型通信如下:
Program -> Device 1616000D 08 02 00 00 00 21 11 A1 00 01 22 08 00 // Sender sends data (a specific command to the device) to Receiver
Program <- Device 16160002 80 00 // Receiver sends ACK to sender
Program <- Device 16160005 08 20 00 00 00 // Receiver sends command response to sender
Program -> Device 16160002 80 00 // Sender sends ACK to receiver
第一个字节序列的最后一个十六进制数表示后面的数据大小(D=13字节)。
我的发送例程看起来像:
bool TcpConnection::SendCommand(const Command& rCommand, const std::vector<BYTE>& rvecCommandOptions)
{
std::vector<BYTE> vecCommandData;
m_commandBuilder.BuildCommand(rCommand, rvecCommandOptions, vecCommandData);
if (vecCommandData.empty())
return false;
PerIoData *pPerIoData = new PerIoData;
if (!pPerIoData)
return false;
SecureZeroMemory(&(pPerIoData->m_overlapped), sizeof(WSAOVERLAPPED));
pPerIoData->m_socket = m_socket.Get();
pPerIoData->m_overlapped.hEvent = WSACreateEvent();
pPerIoData->m_vecBuffer.assign(vecCommandData.begin(), vecCommandData.end());
pPerIoData->m_wsaBuf.buf = (CHAR*)(&(pPerIoData->m_vecBuffer[0]));
pPerIoData->m_wsaBuf.len = pPerIoData->m_vecBuffer.size();
pPerIoData->m_dwFlags = 0;
pPerIoData->m_dwNumberOfBytesSent = 0;
pPerIoData->m_dwNumberOfBytesToSend = pPerIoData->m_wsaBuf.len;
pPerIoData->m_operationType = OP_TYPE_SEND;
if (!m_socket.Send(pPerIoData))
return false;
return true;
}
我的工作线程例程看起来像:
DWORD WINAPI TcpConnection::WorkerThread(LPVOID lpParameter)
{
HANDLE hCompletionPort = (HANDLE)lpParameter;
DWORD dwNumberOfBytesTransferred;
ULONG ulCompletionKey;
PerIoData *pPerIoData;
DWORD dwNumberOfBytesReceived;
DWORD dwNumberOfBytesSent;
DWORD dwFlags;
while (GetQueuedCompletionStatus(hCompletionPort, &dwNumberOfBytesTransferred, &ulCompletionKey, (LPOVERLAPPED*)&pPerIoData, INFINITE))
{
if (!pPerIoData)
continue;
if ((dwNumberOfBytesTransferred == 0) && ((pPerIoData->m_operationType == OP_TYPE_SEND) || (pPerIoData->m_operationType == OP_TYPE_RECEIVE)))
{
closesocket(pPerIoData->m_socket);
delete pPerIoData;
continue;
}
if (pPerIoData->m_operationType == OP_TYPE_SEND)
{
pPerIoData->m_dwNumberOfBytesSent += dwNumberOfBytesTransferred;
if (pPerIoData->m_dwNumberOfBytesSent < pPerIoData->m_dwNumberOfBytesToSend)
{
pPerIoData->m_wsaBuf.buf = (CHAR*)(&(pPerIoData->m_vecBuffer[pPerIoData->m_dwNumberOfBytesSent]));
pPerIoData->m_wsaBuf.len = (pPerIoData->m_dwNumberOfBytesToSend - pPerIoData->m_dwNumberOfBytesSent);
if (WSASend(pPerIoData->m_socket, &(pPerIoData->m_wsaBuf), 1, &dwNumberOfBytesTransferred, 0, &(pPerIoData->m_overlapped), NULL) == 0)
continue;
if (WSAGetLastError() == WSA_IO_PENDING)
continue;
}
else if (pPerIoData->m_dwNumberOfBytesSent == pPerIoData->m_dwNumberOfBytesToSend)
{
delete pPerIoData;
}
// Q1. Do I create a new instance of PerIoData here before calling WSARecv() or reuse pPerIoData?
// QA. If I did do "PerIoData pPerIoData = new PerIoData" here, how do I handle if this momory allocation request has failed? Should I simply "continue" or "return -1"?
// QB. Or is this a wrong place to do this memory allocation to achive the typical communication between my program and the device?
SecureZeroMemory(&(pPerIoData->m_overlapped), sizeof(WSAOVERLAPPED));
pPerIoData->m_overlapped.hEvent = WSACreateEvent();
pPerIoData->m_wsaBuf.buf = (CHAR*)(&(pPerIoData->m_vecBuffer[0]));
pPerIoData->m_wsaBuf.len = pPerIoData->m_vecBuffer.size();
pPerIoData->m_operationType = OP_TYPE_RECEIVE;
if (WSARecv(pPerIoData->m_socket, &(pPerIoData->m_wsaBuf), 1, &dwNumberOfBytesReceived, &(pPerIoData->m_dwFlags), &(pPerIoData->m_overlapped), NULL) == 0)
continue;
if (WSAGetLastError() == WSA_IO_PENDING)
continue;
}
else if (pPerIoData->m_operationType == OP_TYPE_RECEIVE)
{
if ((pPerIoData->m_vecBuffer[0] == 0x16) && (pPerIoData->m_vecBuffer[1] == 0x16))
{
// Q2. Do I need to do SecureZeroMemory(&(pPerIoData->m_overlapped), sizeof(WSAOVERLAPPED)); here?
// Q3. Or do I new PerIoData?
pPerIoData->m_wsaBuf.buf = (CHAR*)(&(pPerIoData->m_vecBuffer[0]));
pPerIoData->m_wsaBuf.len = pPerIoData->m_vecBuffer.size();
pPerIoData->m_operationType = OP_TYPE_RECEIVE;
// QC. At this point two syn bytes (0x16) are received. I now need to receive two more bytes of data (000D = 13 bytes) to find out the size of the actual command response data.
// If I clear my m_vecBuffer here and try to resize its size to two, I get this debug assertion: "vector iterators incompatible" at runtime. Do you know how I can fix this problem?
if (WSARecv(pPerIoData->m_socket, &(pPerIoData->m_wsaBuf), 1, &dwNumberOfBytesReceived, &(pPerIoData->m_dwFlags), &(pPerIoData->m_overlapped), NULL) == 0)
continue;
if (WSAGetLastError() == WSA_IO_PENDING)
continue;
}
// QD. I'm not sure how to structure this if clause for when m_operationType is OP_TYPE_RECEIVE. I mean how do I distinguish one receive operation for getting two syn bytes from another for getting data size?
// One way I can think of doing is to create more receive operation types such as OP_TYPE_RECEIVE_DATA_SIZE or OP_TYPE_RECEIVE_DATA? So you can have something like below.
// Is this how you would do it?
}
//else if (pPerIoData->m_operationType == OP_TYPE_RECEIVE_DATA_SIZE)
//{
// Call WSARecv() again to get command response data
//}
}
return 0;
}
请在上面的代码中查看我的问题。
非常感谢
- 正如
PerIoData
类型的名称所指,每个不完整的I/O请求需要一个数据结构。从使用WSASend
或WSARecv
启动异步I/O到使用GetQueuedCompletionStatus
从I/O完成端口检索该请求的完成数据包,PerIoData
结构应该一直存在 - 在准备启动新请求时,应该始终重新初始化
OVERLAPPED
结构 - 只要I/O请求完成,就可以重用
PerIoData
结构。假设您已经从I/O完成端口检索到pPerIoData
,那么您可以在后续请求中重用它。只需确保已重置该结构中的任何适用字段,使其处于适合新I/O请求的状态即可
编辑以回答后续问题:
A。我会选择continue
,因为即使无法启动额外的请求,您也希望继续处理I/O事件。如果您不continue
,那么您将无法再处理任何I/O完成。在continue
之前,您可能需要调用某种错误处理程序。
B。我不认为一定有一个"正确"或"错误"的地方可以分配,但请记住,当你在那里分配PerIoData
时,你实际上最终会重复分配和删除相同的数据结构。当我使用I/O完成端口编写代码时,我会预先分配一个PerIoData
等效池并重用它们。
C。我没有足够的上下文来知道答案。显示执行此操作的代码以及断言所在的行,我可能会提供帮助。
D。您可以按照建议将操作类型分解为更细粒度的组件,例如OP_TYPE_RECEIVE_DATA_SIZE
操作。作为警告,在每个WSARecv
调用上读取几个字节不会像您希望的那样执行。Winsock通话费用高昂;请求几个字节是一个很大的开销。我建议您在一个WSARecv
中将更大的数据块读取到PerIoData
缓冲区中。然后从缓冲区中提取大小信息,然后开始从缓冲区复制数据。如果到达的数据超过了缓冲区的容量,那么您可以进行额外的WSARecv
调用,直到您在.
相关文章:
- C++为线程工作动态地分割例程
- 自 Windows 10 20H1 以来,具有单独线程的多个窗口停止工作
- 餐饮哲学家问题 - 只有 2 个线程工作
- 工作线程在执行太快后永久休眠
- 唤醒多个线程以在每个条件下工作一次
- Qt::D irectConnection在多线程环境中使用时如何工作?
- ZeroMQ 在使用 std::thread 创建工作线程时崩溃
- 工作线程一直在等待,condition_variable甚至调用了notify_all
- 使用 std::atomic 标志和 std::condition_variable 在工作线程上等待
- SIGABRT 和线程相关的异常,但在调试期间工作正常
- c++线程的安全性和时间效率:为什么有互斥检查的线程有时比没有它的线程工作得更快
- 在多线程工作负载上解释Gperftools的结果
- 我的线程工作不好,它给出了所有结果,最后没有一个接一个,GUI 在线程运行期间挂起?
- BOOST::线程工作线程同步,C++和OpenCV
- 线程工作不正常
- 提升线程工作线程对象在线程完成后的重用
- 线程问题,其中一个线程工作,但导致调用方法不返回
- 线程工作目录
- MFC主UI线程工作和模态对话框
- 用类对象提升线程工作器