c++ Win32数据处理线程

C++ Win32 data processing thread

本文关键字:线程 数据处理 Win32 c++      更新时间:2023-10-16

我写了一个Win32异步客户端应用程序,我想创建一个线程来处理传入的数据。我有一个问题与以下代码。线程启动,获取一些数据,但不能正常运行(数据未完成)。我不明白问题从何而来。你能帮我一下吗?

void incData(void *arg) // This is the thread
{
    WaitForSingleObject(hIncDataMutex, INFINITE);
    char *sRead = _strdup(sReadBuffer.c_str());
    appendTextToEdit(hDebug, sRead); // Some data is displayed, but incompletely
    string var;
    char *pVar = nullptr;                   
    char *next_token = nullptr;
    istringstream iss(sReadBuffer); // sReadBuffer is the global variable I use to pass argument to my thread   
    while (getline(iss, var)) // Default delimiter 'n'
    {       
        pVar = _strdup(var.c_str()); // Cast string to char *
        appendTextToEdit(hDebug, pVar);
        appendTextToEdit(hDebug, "n");
        if(strstr(pVar, "id=") != NULL)
        {   
            char *label = strtok_s(pVar, "=", &next_token);                     
            char *pId = strtok_s(NULL, "n", &next_token);                          
            strcpy_s(id, pId);
        }
        if( strstr(pVar, "version") != NULL)
        {
            char *label = strtok_s(pVar, "=", &next_token);                     
            char *pVersion = strtok_s(NULL, "n", &next_token);                     
            strcpy_s(version, pVersion);                        
        }       
        if( strstr(pVar, "Qh57=") != NULL)
        {
            char *label = strtok_s(pVar, "=", &next_token);     
            char *pFoFd = strtok_s(NULL, "n", &next_token);                            
            strcpy_s(foFd, pFoFd);          
        }
    }
    ReleaseMutex(hIncDataMutex);
}
//.....
case FD_CONNECT: // I launch the thread here (I want it to run forever in the background)
{
    connectStatus = TRUE;
    statusText=TEXT("Connected");               
    hIncDataMutex = CreateMutex(NULL, false, NULL); // Create incoming data process thread mutex
    HANDLE hThread2 = (HANDLE)_beginthread(incData, 0, 0); // Launch incoming data process thread
}
//....
case FD_READ:
{   
    int bytes_recv = recv(Socket, readBuffer, sizeof(readBuffer), 0);               
    sReadBuffer = readBuffer; // Copy the buffer to global scope string (used to feed thread)               
    ReleaseMutex(hIncDataMutex);
}
break;

编辑

下面是我的代码和新的调试行,以及调试窗口的输出:

void incData(void *arg)
{
WaitForSingleObject(hIncDataMutex, INFINITE);
appendTextToEdit(hDebug, "Inside thread...n");

string var;
char *pVar = nullptr;                   
char *next_token = nullptr;
istringstream iss(sReadBuffer); // Put into a stream            
while (getline(iss, var)) // Default delimiter 'n'
{       
    pVar = _strdup(var.c_str()); // Cast string to char *       
    if(strstr(pVar, "id=") != NULL)
    {   
        char *label = strtok_s(pVar, "=", &next_token);                     
        char *pId = strtok_s(NULL, "n", &next_token);                          
        strcpy_s(id, pId);
    }
    if( strstr(pVar, "version") != NULL)
    {
        char *label = strtok_s(pVar, "=", &next_token);                     
        char *pVersion = strtok_s(NULL, "n", &next_token);                     
        strcpy_s(version, pVersion);                        
    }       
    if( strstr(pVar, "Qh57=") != NULL)
    {
        char *label = strtok_s(pVar, "=", &next_token);     
        char *pFoFd = strtok_s(NULL, "n", &next_token);                            
        strcpy_s(foFd, pFoFd);
        appendTextToEdit(hDebug, "Qh57=");
        appendTextToEdit(hDebug, foFd);
        appendTextToEdit(hDebug, "n"); 
    }
}
ReleaseMutex(hIncDataMutex);
appendTextToEdit(hDebug, "Mutex released by threadn");
}
//....
case FD_READ:
            {   
                appendTextToEdit(hDebug, "FD_READ eventn");            

                int bytes_recv = recv(Socket, readBuffer, sizeof(readBuffer), 0);                   
                appendTextToEdit(hDebug, "Bytes reveivedn");
                sReadBuffer = readBuffer; // Copy the buffer to string
                ReleaseMutex(hIncDataMutex);
                appendTextToEdit(hDebug, "End of FD_READn");
            }
            break;

从调试窗口:

FD_CONNECT, begining thread
FD_READ event
Bytes reveived
End of FD_READ
FD_READ event
Bytes reveived
End of FD_READ
Inside thread... // Thread is only called here !
FD_READ event
Bytes reveived
End of FD_READ
Mutex released by thread // Thread ends here
FD_READ event
Bytes reveived
End of FD_READ

等等……你知道吗?

我认为这是因为您在互斥锁上等待数据的方式:将其设置在一侧,而从不尝试将其设置在另一侧,这清楚地表明缺少了一些东西。

下面是可能发生的情况:

main thread                         processing thread
--------------------------------------------------
data A is received on FD_READ        waiting on mutex 
ReleaseMutex() --->                  start processing the data A
other data B is received                .. 
ReleaseMutex()                          .. end processing data A
still other C data overwrites B      ReleaseMutex()     
ReleaseMutex()                       waiting on mutex
                                     processing data C which has overwriten B
                                     ( ==> B is lost) 

避免接收数据,除非你能以一种不覆盖尚未处理的数据的方式缓冲它。这通常需要在更新(填充)共享数据时(即在尝试复制缓冲区之前)设置互斥文本。至少从设计原则来看,本教程可能会引起您的兴趣。

重要的编辑:

与多线程无关,但不要忘记recv()不保证以空结束的字符串。因此,在将其复制到字符串之前,请检查是否有错误,并在后面加上''。所以作为第一步,你可以尝试:

case FD_READ:
{   
    int bytes_recv = recv(Socket, readBuffer, sizeof(readBuffer),
0); 
    if (bytes_recv!=SOCKET_ERROR) {
        readBuffer[bytes_recv] = ''; //... trailing null 
        WaitForSingleObject(hIncDataMutex, INFINITE);
        sReadBuffer = readBuffer; // Copy the buffer to global scope string (used to feed thread)               
        ReleaseMutex(hIncDataMutex);
    }
    else cout << "Socket error"; 
    ...