来自不同线程的同步输出流

Synchronization output streams from different threads

本文关键字:同步 输出流 线程      更新时间:2023-10-16

我正在编写一个将写入XML文件的应用程序中的一些数据。我尝试使用事件核心对象对其进行同步,但是在文件中,我得了错误的数据。我得到下一个结果

<file path="somePath" />
<file path="somePath" <file path="somePath" /> />....

,但我希望能得到

<file path="somePath" />
<file path="somePath" />
<file path="somePath" />

请参见下面的伪代码。有什么问题?

unsigned int WINAPI MyThread(void *p)
{
    std::wofstream outstr;
    outstr.open("indexingtest.xml", std::ios::app);
    do
    {
        if(somePredicat1)
        {
            WaitForSingleObject(hEvent, INFINITE);
            outstr <<"<file path=""<< sFileName << ""n";
            outstr <<"tsize=""<< fileSize << "" />n";           
            ReleaseMutex(hMutex);
        }
         if(somePredicat3)
         {
             MyThread(sFileName);
         }
    }while(somePredicat2);
    outstr.close();
    FindClose( hSearch );
    return 0;
}
int _tmain(int argc, TCHAR *argv[])
{
    hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
    //hMutex = CreateMutex(NULL, FALSE, 0);
    unsigned int ThreadID;
    HANDLE hThread1 = (HANDLE)_beginthreadex(NULL, 0, MyThread, L"D:\*", 0, &ThreadID);
    HANDLE hThread2 = (HANDLE)_beginthreadex(NULL, 0, MyThread, L"C:\*", 0, &ThreadID);
    SetEvent(hEvent);
    std::wcout << "a" << std::endl;
    WaitForSingleObject( hThread1, INFINITE );
    return 0;
}

更多具体代码

HANDLE hMutex = CreateMutex(NULL,FALSE, 0);
wchar_t** GetAllFilesImpl( wchar_t const* folder, wchar_t** res, size_t* pAllocated, size_t* pUsed )
{
    HANDLE hSearch;
    WIN32_FIND_DATAW fileinfo;
    size_t allocatedMemory = 0;

    hSearch = FindFirstFileW( folder, &fileinfo );
    if( hSearch != INVALID_HANDLE_VALUE ) {
        do {
            wchar_t* sFileName, ** tmp, sTmp[ 1024 ];
            long fileSize = 0;
            long creationDate;
            /* ignore ., .. */
            if( !wcscmp(fileinfo.cFileName, L".") ||
                !wcscmp(fileinfo.cFileName, L"..") )
                continue;
            sFileName = PathCreator( folder, fileinfo.cFileName );
            fileSize = fileinfo.nFileSizeLow;
            creationDate = fileinfo.ftCreationTime.dwHighDateTime;

            if(fileSize)
            {
                WaitForSingleObject(hMutex, INFINITE);
                std::wofstream outstr;
                            outstr.open("indexingtest.xml", std::ios::app);
                outstr.seekp(std::ios_base::end);
                outstr <<"<file path=""<< sFileName << ""n";
                outstr <<"tsize=""<< fileSize << "" />n";
                outstr.seekp(std::ios_base::end);
                outstr.close();
                wprintf( L"%sn", sFileName);
                ReleaseMutex(hMutex);
            }
            tmp = AddToArray( res, pAllocated, pUsed, sFileName );
            if( !tmp ) return FreeAllFilesMemory(res), NULL;
            res = tmp;
            if( fileinfo.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY ) {
                wcscpy_s( sTmp, sFileName );
                wcscat_s( sTmp, L"\*" );
                tmp = GetAllFilesImpl( sTmp, res, pAllocated, pUsed );
                if( !tmp ) return NULL;
                res = tmp;
            }
        } while( FindNextFileW(hSearch, &fileinfo) );
        FindClose( hSearch );
    }
    return res;
}
unsigned int WINAPI GetAllFiles( void* folder )
{
    size_t nAllocated = 0, nUsed = 0;
    wchar_t** res = GetAllFilesImpl( (wchar_t *)folder, NULL, &nAllocated, &nUsed );
    if( res ) {
        /* to indicate end of result add a NULL string */
        wchar_t** tmp = AddToArray( res, &nAllocated, &nUsed, NULL );
        if( !tmp ) return FreeAllFilesMemory(res), -1;
        res = tmp;
    }
    std::wcout << "a" << std::endl;
    return 0;
}
int _tmain(int argc, TCHAR *argv[])
{
    Sleep(1000);
    unsigned int ThreadID;
    HANDLE hThreads[3];
    hThreads[0] = (HANDLE)_beginthreadex(NULL, 0, GetAllFiles, L"D:\*", 0, &ThreadID);
    hThreads[1] = (HANDLE)_beginthreadex(NULL, 0, GetAllFiles, L"C:\Users\Andrew\Desktop\*", 0, &ThreadID);
    hThreads[2] = (HANDLE)_beginthreadex(NULL, 0, GetAllFiles, L"E:\*", 0, &ThreadID);
    unsigned int dw = WaitForMultipleObjects(3, hThreads, TRUE, INFINITE);
    CloseHandle(hFile);
    printf("finishedn");
    return 0;
}

您遇到的 big 问题是,每个线程都会分别打开文件。而是在创建线程之前打开文件,然后使用Mutex将写入与文件同步。

在伪代码中:

std::wofstream output_file;
void my_thread()
{
    do
    {
        if (some_condition)
        {
            lock_mutex();
            do_output();
            unlock_mutex();
        }
    } while (condition);
}
int main()
{
    output_file.open(...);
    create_thread();
    create_thread();
    output_file.close();
}
  1. 您应该等待所有线程,然后再结束代码

    HANDLE aThread[2];
    ...
    aThread[0] =  (HANDLE)_beginthreadex(...
    aThread[1] =  (HANDLE)_beginthreadex(...
    WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);
    
  2. 您正在等待事件,然后再进行输出。完成输出后您释放静音。这根本不符合逻辑。您应该等待互惠室和事件。设置事件时,这两个线程都在发布他们的等待。就这样Mutex无需做任何事情。当您将事件句柄和Mutex句柄放入一个数组时,您可以使用waitgorultipleObject也为此目的:

    HANDLE hVarious[2];
    hVarious[0] = CreateEvent(NULL, TRUE, FALSE, NULL);
    // Note: this is a manual reset event. 
    // Thus is stays set until explicitly reset
    hVarious[1] = CreateMutex(NULL, FALSE, 0);
    // and now start the threads:
    aThread[0] =  (HANDLE)_beginthreadex(...
    aThread[1] =  (HANDLE)_beginthreadex(...
    // and set the event:
    SetEvent(hEvent);
    
    WaitForMultipleObjects(2, aThread, TRUE, INFINITE);
    

    线程应该看起来像:

    unsigned int WINAPI MyThread(void *p)
    {
      do
      {
        if(somePredicat1)
        {
          // wait for the mutex AND the event
          WaitForMultipleObjects(2, hVarious, TRUE, INFINITE);
          // do the file stuff in the mutex protected part       
          std::wofstream outstr;
          outstr.open("indexingtest.xml", std::ios::app);
          outstr <<"<file path=""<< sFileName << ""n";
          outstr <<"tsize=""<< fileSize << "" />n";           
          outstr.close();
          FindClose( hSearch );
          ReleaseMutex(hVarious[1]);
        }
      }while(somePredicat2);
      return 0;
    }
    

记住:建立了互斥品以保护并发应用程序中的资源。

我不知道somePredicat1somePredicat1。当在不同的线程中使用时,这些参数也可能会遇到麻烦。但是,您观察到的故障输出是由错误的静音使用情况引起的。

评论后编辑:

    if(somePredicat3)
    {
         MyThread(sFileName);
    }

a。该线程本身被称为一个函数,而无需关闭文件。

b。您应该提供有关somePredicat3, somePredicat2, and somePredicat1的更多详细信息。

c。您必须使用某种排他性保护输出文件,因为它已使用 通过多个线程。您也可以使用关键部分对象进行。