如何为IOCP构建工作线程逻辑

How to structure worker thread logic for IOCP

本文关键字:线程 工作 构建 IOCP      更新时间:2023-10-16

我正在创建一个客户端程序,该程序通过局域网与连接到我电脑的设备进行通信。

我的程序和设备之间的典型通信如下:

Program -> Device   1616000D  08 02 00 00 00 21 11 A1 00 01 22 08 00 // Sender sends data (a specific command to the device) to Receiver
Program <- Device   16160002  80 00 // Receiver sends ACK to sender
Program <- Device   16160005  08 20 00 00 00 // Receiver sends command response to sender
Program -> Device   16160002  80 00 // Sender sends ACK to receiver

第一个字节序列的最后一个十六进制数表示后面的数据大小(D=13字节)。

我的发送例程看起来像:

bool TcpConnection::SendCommand(const Command& rCommand, const std::vector<BYTE>& rvecCommandOptions)
{
    std::vector<BYTE> vecCommandData;
    m_commandBuilder.BuildCommand(rCommand, rvecCommandOptions, vecCommandData);
    if (vecCommandData.empty())
        return false;
    PerIoData *pPerIoData = new PerIoData;
    if (!pPerIoData)
        return false;
    SecureZeroMemory(&(pPerIoData->m_overlapped), sizeof(WSAOVERLAPPED));
    pPerIoData->m_socket = m_socket.Get();
    pPerIoData->m_overlapped.hEvent = WSACreateEvent();
    pPerIoData->m_vecBuffer.assign(vecCommandData.begin(), vecCommandData.end());
    pPerIoData->m_wsaBuf.buf = (CHAR*)(&(pPerIoData->m_vecBuffer[0]));
    pPerIoData->m_wsaBuf.len = pPerIoData->m_vecBuffer.size();
    pPerIoData->m_dwFlags = 0;
    pPerIoData->m_dwNumberOfBytesSent = 0;
    pPerIoData->m_dwNumberOfBytesToSend = pPerIoData->m_wsaBuf.len;
    pPerIoData->m_operationType = OP_TYPE_SEND;
    if (!m_socket.Send(pPerIoData))
        return false;
    return true;
}

我的工作线程例程看起来像:

DWORD WINAPI TcpConnection::WorkerThread(LPVOID lpParameter)
{
    HANDLE hCompletionPort = (HANDLE)lpParameter;
    DWORD dwNumberOfBytesTransferred;
    ULONG ulCompletionKey;
    PerIoData *pPerIoData;
    DWORD dwNumberOfBytesReceived;
    DWORD dwNumberOfBytesSent;
    DWORD dwFlags;
    while (GetQueuedCompletionStatus(hCompletionPort, &dwNumberOfBytesTransferred, &ulCompletionKey, (LPOVERLAPPED*)&pPerIoData, INFINITE)) 
    {
        if (!pPerIoData)
            continue;
        if ((dwNumberOfBytesTransferred == 0) && ((pPerIoData->m_operationType  == OP_TYPE_SEND) || (pPerIoData->m_operationType  == OP_TYPE_RECEIVE)))
        {
            closesocket(pPerIoData->m_socket);
            delete pPerIoData;
            continue;
        }
        if (pPerIoData->m_operationType == OP_TYPE_SEND)
        {
            pPerIoData->m_dwNumberOfBytesSent += dwNumberOfBytesTransferred;
            if (pPerIoData->m_dwNumberOfBytesSent < pPerIoData->m_dwNumberOfBytesToSend)
            {
                pPerIoData->m_wsaBuf.buf = (CHAR*)(&(pPerIoData->m_vecBuffer[pPerIoData->m_dwNumberOfBytesSent]));
                pPerIoData->m_wsaBuf.len = (pPerIoData->m_dwNumberOfBytesToSend - pPerIoData->m_dwNumberOfBytesSent);
                if (WSASend(pPerIoData->m_socket, &(pPerIoData->m_wsaBuf), 1, &dwNumberOfBytesTransferred, 0, &(pPerIoData->m_overlapped), NULL) == 0)
                    continue;
                if (WSAGetLastError() == WSA_IO_PENDING)
                    continue;
            }
            else if (pPerIoData->m_dwNumberOfBytesSent == pPerIoData->m_dwNumberOfBytesToSend)
            {
                delete pPerIoData;
            }
            // Q1. Do I create a new instance of PerIoData here before calling WSARecv() or reuse pPerIoData?
            // QA. If I did do "PerIoData pPerIoData = new PerIoData" here, how do I handle if this momory allocation request has failed?  Should I simply "continue" or "return -1"?
            // QB. Or is this a wrong place to do this memory allocation to achive the typical communication between my program and the device?
            SecureZeroMemory(&(pPerIoData->m_overlapped), sizeof(WSAOVERLAPPED));
            pPerIoData->m_overlapped.hEvent = WSACreateEvent();
            pPerIoData->m_wsaBuf.buf = (CHAR*)(&(pPerIoData->m_vecBuffer[0]));
            pPerIoData->m_wsaBuf.len = pPerIoData->m_vecBuffer.size();
            pPerIoData->m_operationType = OP_TYPE_RECEIVE;
            if (WSARecv(pPerIoData->m_socket, &(pPerIoData->m_wsaBuf), 1, &dwNumberOfBytesReceived, &(pPerIoData->m_dwFlags), &(pPerIoData->m_overlapped), NULL) == 0)
                continue;
            if (WSAGetLastError() == WSA_IO_PENDING)
                continue;
        }
        else if (pPerIoData->m_operationType == OP_TYPE_RECEIVE)
        {
            if ((pPerIoData->m_vecBuffer[0] == 0x16) && (pPerIoData->m_vecBuffer[1] == 0x16))
            {
                // Q2. Do I need to do SecureZeroMemory(&(pPerIoData->m_overlapped), sizeof(WSAOVERLAPPED)); here?
                // Q3. Or do I new PerIoData?
                pPerIoData->m_wsaBuf.buf = (CHAR*)(&(pPerIoData->m_vecBuffer[0]));
                pPerIoData->m_wsaBuf.len = pPerIoData->m_vecBuffer.size();
                pPerIoData->m_operationType = OP_TYPE_RECEIVE;
                // QC. At this point two syn bytes (0x16) are received.  I now need to receive two more bytes of data (000D = 13 bytes) to find out the size of the actual command response data.
                // If I clear my m_vecBuffer here and try to resize its size to two, I get this debug assertion: "vector iterators incompatible" at runtime.  Do you know how I can fix this problem?
                if (WSARecv(pPerIoData->m_socket, &(pPerIoData->m_wsaBuf), 1, &dwNumberOfBytesReceived, &(pPerIoData->m_dwFlags), &(pPerIoData->m_overlapped), NULL) == 0)
                    continue;
                if (WSAGetLastError() == WSA_IO_PENDING)
                    continue;
            }
            // QD. I'm not sure how to structure this if clause for when m_operationType is OP_TYPE_RECEIVE.  I mean how do I distinguish one receive operation for getting two syn bytes from another for getting data size?
            // One way I can think of doing is to create more receive operation types such as OP_TYPE_RECEIVE_DATA_SIZE or OP_TYPE_RECEIVE_DATA?  So you can have something like below.
            // Is this how you would do it?
        }
        //else if (pPerIoData->m_operationType == OP_TYPE_RECEIVE_DATA_SIZE)
        //{
            // Call WSARecv() again to get command response data
        //}
    }
    return 0;
}

请在上面的代码中查看我的问题。

非常感谢

  1. 正如PerIoData类型的名称所指,每个不完整的I/O请求需要一个数据结构。从使用WSASendWSARecv启动异步I/O到使用GetQueuedCompletionStatus从I/O完成端口检索该请求的完成数据包,PerIoData结构应该一直存在
  2. 在准备启动新请求时,应该始终重新初始化OVERLAPPED结构
  3. 只要I/O请求完成,就可以重用PerIoData结构。假设您已经从I/O完成端口检索到pPerIoData,那么您可以在后续请求中重用它。只需确保已重置该结构中的任何适用字段,使其处于适合新I/O请求的状态即可

编辑以回答后续问题:

A。我会选择continue,因为即使无法启动额外的请求,您也希望继续处理I/O事件。如果您不continue,那么您将无法再处理任何I/O完成。在continue之前,您可能需要调用某种错误处理程序。

B。我不认为一定有一个"正确"或"错误"的地方可以分配,但请记住,当你在那里分配PerIoData时,你实际上最终会重复分配和删除相同的数据结构。当我使用I/O完成端口编写代码时,我会预先分配一个PerIoData等效池并重用它们。

C。我没有足够的上下文来知道答案。显示执行此操作的代码以及断言所在的行,我可能会提供帮助。

D。您可以按照建议将操作类型分解为更细粒度的组件,例如OP_TYPE_RECEIVE_DATA_SIZE操作。作为警告,在每个WSARecv调用上读取几个字节不会像您希望的那样执行。Winsock通话费用高昂;请求几个字节是一个很大的开销。我建议您在一个WSARecv中将更大的数据块读取到PerIoData缓冲区中。然后从缓冲区中提取大小信息,然后开始从缓冲区复制数据。如果到达的数据超过了缓冲区的容量,那么您可以进行额外的WSARecv调用,直到您在.

中读取了其余数据