从线程池工作线程使用 GetQueuedCompletionStatus 的奇怪行为

Strange behaviour of GetQueuedCompletionStatus when used from thread pool worker threads

本文关键字:线程 工作 GetQueuedCompletionStatus      更新时间:2023-10-16

我一直在测试将 IO 完成端口与线程池中的工作线程相结合,并偶然发现了我无法解释的行为。特别是,虽然以下代码:

int data;
for (int i = 0; i < NUM; ++i)
PostQueuedCompletionStatus(cp, 1, NULL, reinterpret_cast<LPOVERLAPPED>(&data));
{
std::thread t([&] ()
{
LPOVERLAPPED aux;
DWORD        cmd;
ULONG_PTR    key;
for (int i = 0; i < NUM; ++i)
{
if (!GetQueuedCompletionStatus(cp, &cmd, &key, &aux, 0))
break;
++count;
}
});
t.join();
}

工作正常并接收 NUM 状态通知(NUM 是大数字,100000 或更多(,使用线程池工作对象的类似代码读取每个工作项一个状态通知并在读取后重新发布工作项,在读取几百个状态通知后失败。具有以下全局变量(请不要介意名称(:

HANDLE cport;
PTP_POOL pool;
TP_CALLBACK_ENVIRON env;
PTP_WORK work;
std::size_t num_calls;
std::mutex mutex;
std::condition_variable cv; 
bool job_done;

和回调函数:

static VOID CALLBACK callback(PTP_CALLBACK_INSTANCE instance_, PVOID pv_, PTP_WORK work_)
{
LPOVERLAPPED aux;
DWORD        cmd;
ULONG_PTR    key;
if (GetQueuedCompletionStatus(cport, &cmd, &key, &aux, 0))
{
++num_calls;
SubmitThreadpoolWork(work);
}
else
{
std::unique_lock<std::mutex> l(mutex);
std::cout << "No work after " << num_calls << " calls.n";
job_done = true;
cv.notify_one();
}
}

以下代码:

{
job_done = false;
std::unique_lock<std::mutex> l(mutex);
num_calls = 0;
cport = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);
pool = CreateThreadpool(nullptr);
InitializeThreadpoolEnvironment(&env);
SetThreadpoolCallbackPool(&env, pool);
work = CreateThreadpoolWork(callback, nullptr, &env);
for (int i = 0; i < NUM; ++i)
PostQueuedCompletionStatus(cport, 1, NULL, reinterpret_cast<LPOVERLAPPED>(&data));
SubmitThreadpoolWork(work);
cv.wait_for(l, std::chrono::milliseconds(10000), [] { return job_done; } );
}

会报告"之后没有更多的工作......"在对 GetQueuedCompletionStatus 进行了 250 次左右的调用后,尽管 NUM 设置为 1000000。更奇怪的是,将等待时间从 0 设置为 10 毫秒会将成功调用的数量增加到几十万次,并且偶尔会读取所有 1000000 条通知。我不太明白,因为所有状态通知都是在第一次提交工作对象之前发布的。

组合完成端口和线程池是否真的存在问题,或者我的代码中是否有问题?请不要进入我为什么要这样做 - 我正在调查可能性并偶然发现这一点。在我看来,它应该有效,并且无法弄清楚出了什么问题。谢谢。

我尝试运行此代码,问题似乎是提供给CreateIoCompletionPortNumberOfConcurrentThreads参数。传递 1 意味着执行callback的第一个池线程与 io 完成端口相关联,但由于线程池可能使用不同的线程执行callbackGetQueuedCompletionStatus发生这种情况时将失败。从文档:

要仔细考虑的 I/O 完成端口的最重要属性是并发值。完成端口的并发值是在通过NumberOfConcurrentThreads参数使用CreateIoCompletionPort创建时指定的。此值限制与完成端口关联的可运行线程数。当与完成端口关联的可运行线程总数达到并发值时,系统会阻止执行与该完成端口关联的任何后续线程,直到可运行线程数降至并发值以下。

尽管任意数量的线程都可以为指定的 I/O 完成端口调用GetQueuedCompletionStatus,但当指定的线程首次调用GetQueuedCompletionStatus时,它将与指定的 I/O 完成端口相关联,直到发生以下三种情况之一:线程退出、指定不同的 I/O 完成端口或关闭 I/O 完成端口。换句话说,单个线程最多可以与一个 I/O 完成端口相关联。

因此,要将 io 补全与线程池一起使用,您需要将并发线程数设置为线程池的大小(您可以使用SetThreadpoolThreadMaximum设置(。

::DWORD const threads_count{1};
cport = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, threads_count);
...
pool = ::CreateThreadpool(nullptr);
::SetThreadpoolThreadMaximum(pool, threads_count);