在循环中异步使用 ReadDirectoryChangesW

Using ReadDirectoryChangesW asynchronously in a loop

本文关键字:ReadDirectoryChangesW 异步 循环      更新时间:2023-10-16

简介:

我正在尝试在循环中异步使用ReadDirectoryChangesW。

下面的片段说明了我正在努力实现的目标:

DWORD example()
{
    DWORD error = 0;
    OVERLAPPED ovl = { 0 };
    ovl.hEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL);
    if (NULL == ovl.hEvent) return ::GetLastError();
    char buffer[1024];
    while(1)
    {
        process_list_of_existing_files();
        error = ::ReadDirectoryChangesW(
            m_hDirectory, // I have added FILE_FLAG_OVERLAPPED in CreateFile
            buffer, sizeof(buffer), FALSE,
            FILE_NOTIFY_CHANGE_FILE_NAME,
            NULL, &ovl, NULL);
        // we have new files, append them to the list
        if(error) append_new_files_to_the_list(buffer);
        // just continue with the loop
        else if(::GetLastError() == ERROR_IO_PENDING) continue;
        // RDCW error, this is critical -> exit
        else return ::GetLastError(); 
    }
}

问题:

我不知道如何处理ReadDirectoryChangesW返回FALSEERROR_IO_PENDING GetLastError()代码的情况。

在这种情况下,我应该继续循环并继续循环,直到ReadDirectoryChangesW返回buffer我可以处理。

我努力解决这个问题:

我尝试使用WaitForSingleObject(ovl.hEvent, 1000)但它崩溃并出现错误1450 ERROR_NO_SYSTEM_RESOURCES.下面是重现此行为的 MVCE:

#include <iostream>
#include <Windows.h>
DWORD processDirectoryChanges(const char *buffer)
{
    DWORD offset = 0;
    char fileName[MAX_PATH] = "";
    FILE_NOTIFY_INFORMATION *fni = NULL;
    do
    {
        fni = (FILE_NOTIFY_INFORMATION*)(&buffer[offset]);
        // since we do not use UNICODE, 
        // we must convert fni->FileName from UNICODE to multibyte
        int ret = ::WideCharToMultiByte(CP_ACP, 0, fni->FileName,
            fni->FileNameLength / sizeof(WCHAR),
            fileName, sizeof(fileName), NULL, NULL);
        switch (fni->Action)
        {
        case FILE_ACTION_ADDED:     
        {
            std::cout << fileName << std::endl;
        }
        break;
        default:
            break;
        }
        ::memset(fileName, '', sizeof(fileName));
        offset += fni->NextEntryOffset;
    } while (fni->NextEntryOffset != 0);
    return 0;
}
int main()
{
    HANDLE hDir = ::CreateFile("C:\Users\nenad.smiljkovic\Desktop\test", 
        FILE_LIST_DIRECTORY,
        FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
        NULL, OPEN_EXISTING, 
        FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL);
    if (INVALID_HANDLE_VALUE == hDir) return ::GetLastError();
    OVERLAPPED ovl = { 0 };
    ovl.hEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL);
    if (NULL == ovl.hEvent) return ::GetLastError();
    DWORD error = 0, br;
    char buffer[1024];
    while (1)
    {
        error = ::ReadDirectoryChangesW(hDir,
            buffer, sizeof(buffer), FALSE,
            FILE_NOTIFY_CHANGE_FILE_NAME,
            NULL, &ovl, NULL);
        if (0 == error)
        {
            error = ::GetLastError();
            if (ERROR_IO_PENDING != error)
            {
                ::CloseHandle(ovl.hEvent);
                ::CloseHandle(hDir);
                return error;
            }
        }
        error = ::WaitForSingleObject(ovl.hEvent, 0);
        switch (error)
        {
        case WAIT_TIMEOUT:
            break;
        case WAIT_OBJECT_0:
        {
            error = processDirectoryChanges(buffer);
            if (error > 0)
            {
                ::CloseHandle(ovl.hEvent);
                ::CloseHandle(hDir);
                return error;
            }
            if (0 == ::ResetEvent(ovl.hEvent))
            {
                error = ::GetLastError();
                ::CloseHandle(ovl.hEvent);
                ::CloseHandle(hDir);
                return error;
            }
        }
        break;
        default:
            error = ::GetLastError();
            ::CloseHandle(ovl.hEvent);
            ::CloseHandle(hDir);
            return error;
            break;
        }
    }
    return 0;
}

通读文档,似乎我需要 GetOverlappedResult 并将最后一个参数设置为 FALSE但我不知道如何正确使用此 API。

问题:

由于 MVCE 很好地说明了我正在尝试做什么(打印新添加文件的名称(,您能否告诉我必须在 while 循环中修复哪些内容才能使其工作?

同样,重点是在循环中异步使用ReadDirectoryChangesW,如引言中的代码片段所示。

程序的基本结构看起来或多或少没问题,只是错误地使用了异步 I/O 调用。 每当没有新文件时,事件句柄上的等待会立即超时,这很好,但随后会发出全新的 I/O 请求,但事实并非如此。

这就是系统资源不足的原因;您正在完全倾斜地发出 I/O 请求,而无需等待任何请求完成。 只有在现有请求完成后,才应发出新请求。

(此外,您应该调用 GetOverlappedResult 来检查 I/O 是否成功。

所以你的循环应该看起来更像这样:

    ::ReadDirectoryChangesW(hDir,
        buffer, sizeof(buffer), FALSE,
        FILE_NOTIFY_CHANGE_FILE_NAME,
        NULL, &ovl, NULL);
    while (1)
    {
        DWORD dw;
        DWORD result = ::WaitForSingleObject(ovl.hEvent, 0);
        switch (result)
        {
        case WAIT_TIMEOUT:
            processBackgroundTasks();
            break;
        case WAIT_OBJECT_0:
            ::GetOverlappedResult(hDir, &ovl, &dw, FALSE);
            processDirectoryChanges(buffer);
            ::ResetEvent(ovl.hEvent);
            ::ReadDirectoryChangesW(hDir,
                buffer, sizeof(buffer), FALSE,
                FILE_NOTIFY_CHANGE_FILE_NAME,
                NULL, &ovl, NULL);
            break;
        }
    }

笔记:

  • 为简单起见,已省略错误处理;我没有做任何测试或检查您的代码是否存在任何其他问题。

  • 如果可能没有任何后台任务要执行,则应测试这种情况,并在发生超时时将超时设置为 INFINITE 而不是 0,否则您将旋转。

  • 我只想显示使其工作所需的最小更改,但是调用 WaitForSingleObject 后跟 GetOverlappedResult 是多余的;对 GetOverlappedResult 的单个调用既可以检查 I/O 是否完成,也可以检索结果(如果已完成(。


根据要求,修改后的版本仅使用 GetOverlappedResult 并具有最少的错误检查。 我还添加了一个示例,说明如何处理无工作可做的情况;如果您对文件所做的任何处理确实永远运行,则不需要该位。

    ::ResetEvent(ovl.hEvent);
    if (!::ReadDirectoryChangesW(hDir,
        buffer, sizeof(buffer), FALSE,
        FILE_NOTIFY_CHANGE_FILE_NAME,
        NULL, &ovl, NULL))
    {
       error = GetLastError();
       if (error != ERROR_IO_PENDING) fail();
    }
    while (1)
    {
        BOOL wait;
        result = process_list_of_existing_files();
        if (result == MORE_WORK_PENDING)
        {
           wait = FALSE;
        } 
        else if (result == NO_MORE_WORK_PENDING)
        {
           wait = TRUE;
        } 
        if (!::GetOverlappedResult(hDir, &ovl, &dw, wait))
        {
           error = GetLastError();
           if (error == ERROR_IO_INCOMPLETE) continue;
           fail();
        }
        processDirectoryChanges(buffer);
        ::ResetEvent(ovl.hEvent);
        if (!::ReadDirectoryChangesW(hDir,
            buffer, sizeof(buffer), FALSE,
            FILE_NOTIFY_CHANGE_FILE_NAME,
            NULL, &ovl, NULL))
        {
           error = GetLastError();
           if (error != ERROR_IO_PENDING) fail();
        } 
    }

使用 IOCP 的间接变体

  1. 创建继承(包含(OVERLAPPED(或 IO_STATUS_BLOCK (,一个引用计数器、目录句柄和数据你需要
  2. 拨打BindIoCompletionCallback ( RtlSetIoCompletionCallback (目录句柄,用于设置回调
  3. 有一个DoRead()例程,我们将首先从主线程调用它,然后从回调调用它
  4. DoRead(),每次调用之前ReadDirectoryChangesW呼叫 AddRef();因为我们将引用(跨OVERLAPPED(传递给我们的结构到内核
  5. 主线程(比如GUI线程(可以在初始调用后继续执行自己的任务DoRead() ,与 APC 变体不同,我们不需要在可警报状态下等待
  6. 在回调中,我们从继承(包含(中获得了指向结构的指针 OVERLAPPED .如果需要,执行任何任务(processDirectoryChanges继续间谍 - 呼叫DoRead(),最后呼叫Release()
  7. 如果来自DoRead()ReadDirectoryChangesW失败(结果将是无回调( - 我们需要直接调用回调带有错误代码
  8. 对于停止,我们可以简单地关闭目录句柄 - 结果,我们得到了 回调中的STATUS_NOTIFY_CLEANUP

====

==================================
//#define _USE_NT_VERSION_
class SPYDATA : 
#ifdef _USE_NT_VERSION_
    IO_STATUS_BLOCK
#else
    OVERLAPPED 
#endif
{
    HANDLE _hFile;
    LONG _dwRef;
    union {
        FILE_NOTIFY_INFORMATION _fni;
        UCHAR _buf[PAGE_SIZE];
    };
    void DumpDirectoryChanges()
    {
        union {
            PVOID buf;
            PBYTE pb;
            PFILE_NOTIFY_INFORMATION pfni;
        };
        buf = _buf;
        for (;;)
        {
            DbgPrint("%x <%.*S>n", pfni->Action, pfni->FileNameLength >> 1, pfni->FileName);
            ULONG NextEntryOffset = pfni->NextEntryOffset;
            if (!NextEntryOffset)
            {
                break;
            }
            pb += NextEntryOffset;
        }
    }
#ifdef _USE_NT_VERSION_
    static VOID WINAPI _OvCompRoutine(
        _In_    NTSTATUS dwErrorCode,
        _In_    ULONG_PTR dwNumberOfBytesTransfered,
        _Inout_ PIO_STATUS_BLOCK Iosb
        )
    {
        static_cast<SPYDATA*>(Iosb)->OvCompRoutine(dwErrorCode, (ULONG)dwNumberOfBytesTransfered);
    }
#else
    static VOID WINAPI _OvCompRoutine(
        _In_    DWORD dwErrorCode, // really this is NTSTATUS
        _In_    DWORD dwNumberOfBytesTransfered,
        _Inout_ LPOVERLAPPED lpOverlapped
        )
    {
        static_cast<SPYDATA*>(lpOverlapped)->OvCompRoutine(dwErrorCode, dwNumberOfBytesTransfered);
    }
#endif
    VOID OvCompRoutine(NTSTATUS status, DWORD dwNumberOfBytesTransfered)
    {
        DbgPrint("[%x,%x]n", status, dwNumberOfBytesTransfered);
        if (0 <= status) 
        {
            if (status != STATUS_NOTIFY_CLEANUP)
            {
                if (dwNumberOfBytesTransfered) DumpDirectoryChanges();
                process_list_of_existing_files();// so hard do this here ?!?
                DoRead();
            }
            else
            {
                DbgPrint("n---- NOTIFY_CLEANUP -----n");
            }
        }
        Release();
        MyReleaseRundownProtection();
    }
    ~SPYDATA()
    {
        Cancel();
    }
public:
    void DoRead()
    {
        if (MyAcquireRundownProtection())
        {
            AddRef();
#ifdef _USE_NT_VERSION_
            NTSTATUS status = ZwNotifyChangeDirectoryFile(_hFile, 0, 0, this, this, &_fni, sizeof(_buf), FILE_NOTIFY_VALID_MASK, TRUE);
            if (NT_ERROR(status))
            {
                OvCompRoutine(status, 0);
            }
#else
            if (!ReadDirectoryChangesW(_hFile, _buf, sizeof(_buf), TRUE, FILE_NOTIFY_VALID_MASK, (PDWORD)&InternalHigh, this, 0))
            {
                OvCompRoutine(RtlGetLastNtStatus(), 0);
            }
#endif
        }
    }
    SPYDATA()
    {
        _hFile = 0;// ! not INVALID_HANDLE_VALUE because use ntapi for open file
        _dwRef = 1;
#ifndef _USE_NT_VERSION_
        RtlZeroMemory(static_cast<OVERLAPPED*>(this), sizeof(OVERLAPPED));
#endif
    }
    void AddRef()
    {
        InterlockedIncrement(&_dwRef);
    }
    void Release()
    {
        if (!InterlockedDecrement(&_dwRef))
        {
            delete this;
        }
    }
    BOOL Create(POBJECT_ATTRIBUTES poa)
    {
        IO_STATUS_BLOCK iosb;
        NTSTATUS status = ZwOpenFile(&_hFile, FILE_GENERIC_READ, poa, &iosb, FILE_SHARE_VALID_FLAGS, FILE_DIRECTORY_FILE);
        if (0 <= status)
        {
            return
#ifdef _USE_NT_VERSION_
                0 <= RtlSetIoCompletionCallback(_hFile, _OvCompRoutine, 0);
#else
                BindIoCompletionCallback(_hFile, _OvCompRoutine, 0);
#endif
        }
        return FALSE;
    }
    void Cancel()
    {
        if (HANDLE hFile = InterlockedExchangePointer(&_hFile, 0))
        {
            NtClose(hFile);
        }
    }
};
void DemoF()
{
    if (MyInitializeRundownProtection())
    {
        STATIC_OBJECT_ATTRIBUTES(oa, "<SOME_DIRECTORY>");
        if (SPYDATA* p = new SPYDATA)
        {
            if (p->Create(&oa))
            {
                p->DoRead();
            }
            //++ GUI thread run
            MessageBoxW(0, L"wait close program...", L"", MB_OK);
            //-- GUI thread end
            p->Cancel();
            p->Release();
        }
        MyWaitForRundownProtectionRelease();
    }
}
相关文章:
  • 没有找到相关文章