在循环中异步使用 ReadDirectoryChangesW
Using ReadDirectoryChangesW asynchronously in a loop
简介:
我正在尝试在循环中异步使用ReadDirectoryChangesW。
下面的片段说明了我正在努力实现的目标:
DWORD example()
{
DWORD error = 0;
OVERLAPPED ovl = { 0 };
ovl.hEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL);
if (NULL == ovl.hEvent) return ::GetLastError();
char buffer[1024];
while(1)
{
process_list_of_existing_files();
error = ::ReadDirectoryChangesW(
m_hDirectory, // I have added FILE_FLAG_OVERLAPPED in CreateFile
buffer, sizeof(buffer), FALSE,
FILE_NOTIFY_CHANGE_FILE_NAME,
NULL, &ovl, NULL);
// we have new files, append them to the list
if(error) append_new_files_to_the_list(buffer);
// just continue with the loop
else if(::GetLastError() == ERROR_IO_PENDING) continue;
// RDCW error, this is critical -> exit
else return ::GetLastError();
}
}
问题:
我不知道如何处理ReadDirectoryChangesW
返回FALSE
并ERROR_IO_PENDING
GetLastError()
代码的情况。
在这种情况下,我应该继续循环并继续循环,直到ReadDirectoryChangesW
返回buffer
我可以处理。
我努力解决这个问题:
我尝试使用WaitForSingleObject(ovl.hEvent, 1000)
但它崩溃并出现错误1450 ERROR_NO_SYSTEM_RESOURCES
.下面是重现此行为的 MVCE:
#include <iostream>
#include <Windows.h>
DWORD processDirectoryChanges(const char *buffer)
{
DWORD offset = 0;
char fileName[MAX_PATH] = "";
FILE_NOTIFY_INFORMATION *fni = NULL;
do
{
fni = (FILE_NOTIFY_INFORMATION*)(&buffer[offset]);
// since we do not use UNICODE,
// we must convert fni->FileName from UNICODE to multibyte
int ret = ::WideCharToMultiByte(CP_ACP, 0, fni->FileName,
fni->FileNameLength / sizeof(WCHAR),
fileName, sizeof(fileName), NULL, NULL);
switch (fni->Action)
{
case FILE_ACTION_ADDED:
{
std::cout << fileName << std::endl;
}
break;
default:
break;
}
::memset(fileName, ' ', sizeof(fileName));
offset += fni->NextEntryOffset;
} while (fni->NextEntryOffset != 0);
return 0;
}
int main()
{
HANDLE hDir = ::CreateFile("C:\Users\nenad.smiljkovic\Desktop\test",
FILE_LIST_DIRECTORY,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
NULL, OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL);
if (INVALID_HANDLE_VALUE == hDir) return ::GetLastError();
OVERLAPPED ovl = { 0 };
ovl.hEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL);
if (NULL == ovl.hEvent) return ::GetLastError();
DWORD error = 0, br;
char buffer[1024];
while (1)
{
error = ::ReadDirectoryChangesW(hDir,
buffer, sizeof(buffer), FALSE,
FILE_NOTIFY_CHANGE_FILE_NAME,
NULL, &ovl, NULL);
if (0 == error)
{
error = ::GetLastError();
if (ERROR_IO_PENDING != error)
{
::CloseHandle(ovl.hEvent);
::CloseHandle(hDir);
return error;
}
}
error = ::WaitForSingleObject(ovl.hEvent, 0);
switch (error)
{
case WAIT_TIMEOUT:
break;
case WAIT_OBJECT_0:
{
error = processDirectoryChanges(buffer);
if (error > 0)
{
::CloseHandle(ovl.hEvent);
::CloseHandle(hDir);
return error;
}
if (0 == ::ResetEvent(ovl.hEvent))
{
error = ::GetLastError();
::CloseHandle(ovl.hEvent);
::CloseHandle(hDir);
return error;
}
}
break;
default:
error = ::GetLastError();
::CloseHandle(ovl.hEvent);
::CloseHandle(hDir);
return error;
break;
}
}
return 0;
}
通读文档,似乎我需要 GetOverlappedResult 并将最后一个参数设置为 FALSE
但我不知道如何正确使用此 API。
问题:
由于 MVCE 很好地说明了我正在尝试做什么(打印新添加文件的名称(,您能否告诉我必须在 while
循环中修复哪些内容才能使其工作?
同样,重点是在循环中异步使用ReadDirectoryChangesW
,如引言中的代码片段所示。
程序的基本结构看起来或多或少没问题,只是错误地使用了异步 I/O 调用。 每当没有新文件时,事件句柄上的等待会立即超时,这很好,但随后会发出全新的 I/O 请求,但事实并非如此。
这就是系统资源不足的原因;您正在完全倾斜地发出 I/O 请求,而无需等待任何请求完成。 只有在现有请求完成后,才应发出新请求。
(此外,您应该调用 GetOverlappedResult 来检查 I/O 是否成功。
所以你的循环应该看起来更像这样:
::ReadDirectoryChangesW(hDir,
buffer, sizeof(buffer), FALSE,
FILE_NOTIFY_CHANGE_FILE_NAME,
NULL, &ovl, NULL);
while (1)
{
DWORD dw;
DWORD result = ::WaitForSingleObject(ovl.hEvent, 0);
switch (result)
{
case WAIT_TIMEOUT:
processBackgroundTasks();
break;
case WAIT_OBJECT_0:
::GetOverlappedResult(hDir, &ovl, &dw, FALSE);
processDirectoryChanges(buffer);
::ResetEvent(ovl.hEvent);
::ReadDirectoryChangesW(hDir,
buffer, sizeof(buffer), FALSE,
FILE_NOTIFY_CHANGE_FILE_NAME,
NULL, &ovl, NULL);
break;
}
}
笔记:
为简单起见,已省略错误处理;我没有做任何测试或检查您的代码是否存在任何其他问题。
如果可能没有任何后台任务要执行,则应测试这种情况,并在发生超时时将超时设置为 INFINITE 而不是 0,否则您将旋转。
我只想显示使其工作所需的最小更改,但是调用 WaitForSingleObject 后跟 GetOverlappedResult 是多余的;对 GetOverlappedResult 的单个调用既可以检查 I/O 是否完成,也可以检索结果(如果已完成(。
根据要求,修改后的版本仅使用 GetOverlappedResult 并具有最少的错误检查。 我还添加了一个示例,说明如何处理无工作可做的情况;如果您对文件所做的任何处理确实永远运行,则不需要该位。
::ResetEvent(ovl.hEvent);
if (!::ReadDirectoryChangesW(hDir,
buffer, sizeof(buffer), FALSE,
FILE_NOTIFY_CHANGE_FILE_NAME,
NULL, &ovl, NULL))
{
error = GetLastError();
if (error != ERROR_IO_PENDING) fail();
}
while (1)
{
BOOL wait;
result = process_list_of_existing_files();
if (result == MORE_WORK_PENDING)
{
wait = FALSE;
}
else if (result == NO_MORE_WORK_PENDING)
{
wait = TRUE;
}
if (!::GetOverlappedResult(hDir, &ovl, &dw, wait))
{
error = GetLastError();
if (error == ERROR_IO_INCOMPLETE) continue;
fail();
}
processDirectoryChanges(buffer);
::ResetEvent(ovl.hEvent);
if (!::ReadDirectoryChangesW(hDir,
buffer, sizeof(buffer), FALSE,
FILE_NOTIFY_CHANGE_FILE_NAME,
NULL, &ovl, NULL))
{
error = GetLastError();
if (error != ERROR_IO_PENDING) fail();
}
}
使用 IOCP 的间接变体
- 创建继承(包含(
OVERLAPPED
(或IO_STATUS_BLOCK
(,一个引用计数器、目录句柄和数据你需要 - 拨打
BindIoCompletionCallback
(RtlSetIoCompletionCallback
(目录句柄,用于设置回调 - 有一个
DoRead()
例程,我们将首先从主线程调用它,然后从回调调用它 - 在
DoRead()
,每次调用之前ReadDirectoryChangesW
呼叫AddRef();
因为我们将引用(跨OVERLAPPED
(传递给我们的结构到内核 - 主线程(比如GUI线程(可以在初始调用后继续执行自己的任务
DoRead()
,与 APC 变体不同,我们不需要在可警报状态下等待 - 在回调中,我们从继承(包含(中获得了指向结构的指针
OVERLAPPED
.如果需要,执行任何任务(processDirectoryChanges
继续间谍 - 呼叫DoRead()
,最后呼叫Release()
- 如果来自
DoRead()
的ReadDirectoryChangesW
失败(结果将是无回调( - 我们需要直接调用回调带有错误代码 - 对于停止,我们可以简单地关闭目录句柄 - 结果,我们得到了 回调中的
STATUS_NOTIFY_CLEANUP
====
==================================//#define _USE_NT_VERSION_
class SPYDATA :
#ifdef _USE_NT_VERSION_
IO_STATUS_BLOCK
#else
OVERLAPPED
#endif
{
HANDLE _hFile;
LONG _dwRef;
union {
FILE_NOTIFY_INFORMATION _fni;
UCHAR _buf[PAGE_SIZE];
};
void DumpDirectoryChanges()
{
union {
PVOID buf;
PBYTE pb;
PFILE_NOTIFY_INFORMATION pfni;
};
buf = _buf;
for (;;)
{
DbgPrint("%x <%.*S>n", pfni->Action, pfni->FileNameLength >> 1, pfni->FileName);
ULONG NextEntryOffset = pfni->NextEntryOffset;
if (!NextEntryOffset)
{
break;
}
pb += NextEntryOffset;
}
}
#ifdef _USE_NT_VERSION_
static VOID WINAPI _OvCompRoutine(
_In_ NTSTATUS dwErrorCode,
_In_ ULONG_PTR dwNumberOfBytesTransfered,
_Inout_ PIO_STATUS_BLOCK Iosb
)
{
static_cast<SPYDATA*>(Iosb)->OvCompRoutine(dwErrorCode, (ULONG)dwNumberOfBytesTransfered);
}
#else
static VOID WINAPI _OvCompRoutine(
_In_ DWORD dwErrorCode, // really this is NTSTATUS
_In_ DWORD dwNumberOfBytesTransfered,
_Inout_ LPOVERLAPPED lpOverlapped
)
{
static_cast<SPYDATA*>(lpOverlapped)->OvCompRoutine(dwErrorCode, dwNumberOfBytesTransfered);
}
#endif
VOID OvCompRoutine(NTSTATUS status, DWORD dwNumberOfBytesTransfered)
{
DbgPrint("[%x,%x]n", status, dwNumberOfBytesTransfered);
if (0 <= status)
{
if (status != STATUS_NOTIFY_CLEANUP)
{
if (dwNumberOfBytesTransfered) DumpDirectoryChanges();
process_list_of_existing_files();// so hard do this here ?!?
DoRead();
}
else
{
DbgPrint("n---- NOTIFY_CLEANUP -----n");
}
}
Release();
MyReleaseRundownProtection();
}
~SPYDATA()
{
Cancel();
}
public:
void DoRead()
{
if (MyAcquireRundownProtection())
{
AddRef();
#ifdef _USE_NT_VERSION_
NTSTATUS status = ZwNotifyChangeDirectoryFile(_hFile, 0, 0, this, this, &_fni, sizeof(_buf), FILE_NOTIFY_VALID_MASK, TRUE);
if (NT_ERROR(status))
{
OvCompRoutine(status, 0);
}
#else
if (!ReadDirectoryChangesW(_hFile, _buf, sizeof(_buf), TRUE, FILE_NOTIFY_VALID_MASK, (PDWORD)&InternalHigh, this, 0))
{
OvCompRoutine(RtlGetLastNtStatus(), 0);
}
#endif
}
}
SPYDATA()
{
_hFile = 0;// ! not INVALID_HANDLE_VALUE because use ntapi for open file
_dwRef = 1;
#ifndef _USE_NT_VERSION_
RtlZeroMemory(static_cast<OVERLAPPED*>(this), sizeof(OVERLAPPED));
#endif
}
void AddRef()
{
InterlockedIncrement(&_dwRef);
}
void Release()
{
if (!InterlockedDecrement(&_dwRef))
{
delete this;
}
}
BOOL Create(POBJECT_ATTRIBUTES poa)
{
IO_STATUS_BLOCK iosb;
NTSTATUS status = ZwOpenFile(&_hFile, FILE_GENERIC_READ, poa, &iosb, FILE_SHARE_VALID_FLAGS, FILE_DIRECTORY_FILE);
if (0 <= status)
{
return
#ifdef _USE_NT_VERSION_
0 <= RtlSetIoCompletionCallback(_hFile, _OvCompRoutine, 0);
#else
BindIoCompletionCallback(_hFile, _OvCompRoutine, 0);
#endif
}
return FALSE;
}
void Cancel()
{
if (HANDLE hFile = InterlockedExchangePointer(&_hFile, 0))
{
NtClose(hFile);
}
}
};
void DemoF()
{
if (MyInitializeRundownProtection())
{
STATIC_OBJECT_ATTRIBUTES(oa, "<SOME_DIRECTORY>");
if (SPYDATA* p = new SPYDATA)
{
if (p->Create(&oa))
{
p->DoRead();
}
//++ GUI thread run
MessageBoxW(0, L"wait close program...", L"", MB_OK);
//-- GUI thread end
p->Cancel();
p->Release();
}
MyWaitForRundownProtectionRelease();
}
}
- 没有找到相关文章