如何使用"priority"进行多线程处理?

How to go about multithreading with "priority"?

本文关键字:多线程处理 priority 何使用      更新时间:2023-10-16

当程序空闲时,我有多个线程在后台处理多个文件
为了提高磁盘吞吐量,我使用关键部分来确保没有两个线程同时使用同一磁盘

(伪)代码看起来像这样:

void RunThread(HANDLE fileHandle)
{
// Acquire CRITICAL_SECTION for disk
CritSecLock diskLock(GetDiskLock(fileHandle));
for (...)
{
// Do some processing on file
}
}

一旦用户请求处理一个文件,我需要停止所有线程--,处理请求文件的线程除外。处理完文件后,我想再次恢复所有线程。

考虑到SuspendThread是个坏主意,我该如何停止除处理相关输入的线程之外的所有线程?

我需要什么样的线程对象/功能——互斥、信号量、事件或其他什么?我该如何使用它们?(我希望能与Windows XP兼容。)

我建议您以完全不同的方式进行。如果你真的希望每个磁盘只有一个线程(我不相信这是个好主意),那么你应该为每个磁盘创建一个线程,并在排队处理文件时分发文件。

为了实现特定文件的优先级请求,我会让线程在正常处理过程中的几个点检查"优先级槽"(当然还有在其主队列等待循环中)。

这里的困难不在于优先级,而是你希望一个线程退出它所持有的锁,让另一个线程来执行它。

所以,你想要实现(正如你所说的):

if (ThisThreadNeedsToSuspend()) { ReleaseDiskLock(); WaitForResume(); ReacquireDiskLock(); }

既然你(明智地)使用了一个作用域锁,我想颠倒一下逻辑:

while (file_is_not_finished) {
WaitUntilThisThreadCanContinue();
CritSecLock diskLock(blah);
process_part_of_the_file();
}
ReleasePriority();
...
void WaitUntilThisThreadCanContinue() {
MutexLock lock(thread_priority_mutex);
while (thread_with_priority != NOTHREAD and thread_with_priority != thisthread) {
condition_variable_wait(thread_priority_condvar);
}
}
void GiveAThreadThePriority(threadid) {
MutexLock lock(thread_priority_mutex);
thread_with_priority = threadid;
condition_variable_broadcast(thread_priority_condvar);
}
void ReleasePriority() {
MutexLock lock(thread_priority_mutex);
if (thread_with_priority == thisthread) {
thread_with_priority = NOTHREAD;
condition_variable_broadcast(thread_priority_condvar);
}
}

阅读条件变量——所有最近的操作系统都有它们,具有类似的基本操作。它们也在Boost和C++11中。

如果你不可能写一个函数process_part_of_the_file,那么你就不能用这种方式构造它。相反,您需要一个作用域锁,它可以释放并重新获得磁盘锁。最简单的方法是将其设为互斥对象,然后可以使用相同的互斥对象等待condvar。您仍然可以以大致相同的方式使用互斥/condvar对和thread_with_priority对象。

您可以根据系统对优先级更改的响应程度来选择"部分文件"的大小。如果你需要非常响应,那么这个方案并不能真正起作用——这是协作多任务处理。

我对这个答案并不完全满意,如果有很多其他线程已经在同一个磁盘锁上等待,那么具有优先级的线程可能会长时间处于饥饿状态。我会花更多的心思来避免这种情况。可能不应该有每个磁盘的锁,而是整个事情应该在条件变量及其关联的互斥体下处理。不过,我希望这能让你开始。

您可以要求线程正常停止。只需在线程内部的循环中检查一些变量,然后根据其值继续或终止工作。

关于它的一些想法:

  • 该值的设置和检查应在关键部分内完成
  • 因为关键部分会减慢线程的速度,所以应该经常进行检查,以便在需要时快速停止线程,而很少进行检查,这样线程就不会因获取和释放关键部分而停滞

每个工作线程处理一个文件后,检查与该线程关联的条件变量。条件变量可以简单地实现为bool+关键部分。或者具有InterlockedExchange*功能。老实说,我通常只是在线程之间使用一个无保护的bool来表示"需要退出"——有时如果工作线程可能正在休眠,我会使用一个事件句柄。

为每个线程设置条件变量后,主线程等待每个线程通过WaitForSingleObject退出。

DWORD __stdcall WorkerThread(void* pThreadData)
{
ThreadData* pData = (ThreadData*) pTheradData;
while (pData->GetNeedToExit() == false)
{
ProcessNextFile();
}
return 0;
}
void StopWokerThread(HANDLE hThread, ThreadData* pData)
{
pData->SetNeedToExit = true;
WaitForSingleObject(hThread);
CloseHandle(hThread);
}
struct ThreadData()
{
CRITICAL_SECITON _cs;
ThreadData()
{
InitializeCriticalSection(&_cs);
}
~ThreadData()
{
DeleteCriticalSection(&_cs);
}
ThreadData::SetNeedToExit()
{
EnterCriticalSection(&_cs);
_NeedToExit = true;
LeaveCriticalSeciton(&_cs);
}
bool ThreadData::GetNeedToExit()
{
bool returnvalue;
EnterCriticalSection(&_cs);
returnvalue = _NeedToExit = true;
LeaveCriticalSeciton(&_cs);
return returnvalue;
}
};

您还可以使用线程池,并通过使用I/O完成端口来调节它们的工作。

通常,池中的线程将休眠,等待I/O完成端口事件/活动。当您有一个请求时,I/O完成端口会释放线程并开始执行作业。

好吧,这个怎么样:

每个磁盘有两个线程,分别用于高优先级和低优先级请求,每个线程都有自己的输入队列。

高优先级磁盘任务在最初提交时,将与正在运行的任何低优先级任务并行发出其磁盘请求。它可以重置低优先级线程等待的ManualResetEvent(WaitForSingleObject),因此如果高优先级线程正在执行磁盘操作,它将被阻塞。高优先级线程应该在完成任务后设置事件。

这应该将磁盘抖动限制在提交高优先级任务和低优先级线程可以在MRE上等待之间的间隔(如果有的话)。提高为高优先级队列提供服务的线程的CPU优先级可以有助于提高该间隔中的高优先级工作的性能。

编辑:所谓的"队列",我指的是线程安全的、阻塞的、生产者-消费者队列(需要澄清的是:)。

更多编辑-如果发出线程需要通知作业完成,则发出到队列的任务可能包含一个"OnCompletion"事件,以任务对象作为参数进行调用。例如,事件处理程序可以向发起线程正在等待的AutoResetEvent发出信号,从而提供同步通知。