Windows API线程池的简单例子

Windows API Thread Pool simple example

本文关键字:简单 API 线程 Windows      更新时间:2023-10-16

[编辑:感谢MSalters的回答和Raymond Chen对InterlockedIncrement vs EnterCriticalSection/counter++/leaveccriticalsection的回答,问题得到了解决,下面的代码正常工作。]这应该提供了一个有趣的简单示例,在Windows中使用线程池]

我找不到下面这个任务的简单例子。例如,我的程序需要将一个巨大的std::vector中的值加1,所以我想并行地做这件事。它需要在程序的整个生命周期中多次这样做。我知道如何在每次调用例程时使用CreateThread,但我无法用ThreadPool摆脱CreateThread。

我是这样做的:

class Thread {
public:
    Thread(){}
    virtual void run() = 0 ; // I can inherit an "IncrementVectorThread"
};
class IncrementVectorThread: public Thread {
public:
   IncrementVectorThread(int threadID, int nbThreads, std::vector<int> &vec) : id(threadID), nb(nbThreads), myvec(vec) { };
   virtual void run() {
        for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++)
          myvec[i]++; //and let's assume myvec is properly sized
    }
   int id, nb;
   std::vector<int> &myvec;
};
class ThreadGroup : public std::vector<Thread*> {
public:
    ThreadGroup() { 
         pool = CreateThreadpool(NULL);
         InitializeThreadpoolEnvironment(&cbe);
         cleanupGroup = CreateThreadpoolCleanupGroup();
         SetThreadpoolCallbackPool(&cbe, pool);
         SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
         threadCount = 0;
    }
    ~ThreadGroup() {
         CloseThreadpool(pool);
}
    PTP_POOL pool;
    TP_CALLBACK_ENVIRON cbe;
    PTP_CLEANUP_GROUP cleanupGroup;
    volatile long threadCount;
} ;

static VOID CALLBACK runFunc(
                PTP_CALLBACK_INSTANCE Instance,
                PVOID Context,
                PTP_WORK Work) {
   ThreadGroup &thread = *((ThreadGroup*) Context);
   long id = InterlockedIncrement(&(thread.threadCount));
   DWORD tid = (id-1)%thread.size();
   thread[tid]->run();
}
void run_threads(ThreadGroup* thread_group) {
    SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size());
    SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size());
    TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe);
    thread_group->threadCount = 0;
    for (int i=0; i<thread_group->size(); i++) {
        SubmitThreadpoolWork(worker);
     }  
     WaitForThreadpoolWorkCallbacks(worker,FALSE);  
     CloseThreadpoolWork(worker);   
}       
void main() {
   ThreadGroup group;
   std::vector<int> vec(10000, 0);
   for (int i=0; i<10; i++)
      group.push_back(new IncrementVectorThread(i, 10, vec));
   run_threads(&group);
   run_threads(&group);
   run_threads(&group);
   // now, vec should be == std::vector<int>(10000, 3);       
}

所以,如果我没理解错的话:
命令CreateThreadpool创建了一堆线程(因此,调用CreateThreadpoolWork是便宜的,因为它不调用CreateThread)
-我可以有尽可能多的线程池,因为我想(如果我想做一个线程池为"IncrementVector"和一个为我的"递减向量"线程,我可以)。-如果我需要将我的"增量向量"任务划分为10个线程,而不是调用10次CreateThread,我创建一个单独的"worker",并将其提交给ThreadPool 10次,使用相同的参数(因此,我需要回调中的线程ID来知道我的std::vector的哪一部分要增量)。这里我找不到线程ID,因为函数GetCurrentThreadId()返回线程的真实ID(即。比如1528,而不是介于0..nb_launched_threads之间。

最后,我不确定我是否很好地理解了这个概念:如果我将std::vector拆分为10个线程,我真的需要一个worker而不是10个worker吗?

谢谢!

你差不多到了最后一点了。

关于线程池的全部思想是你不关心它有多少线程。您只需将大量工作丢到线程池中,并让操作系统决定如何执行每个块。因此,如果您创建并提交10个块,操作系统可能会使用池中的1到10个线程。

你不应该关心那些线程标识。不要担心线程ID,最小或最大线程数,或类似的东西。

如果你不关心线程标识,那么你如何管理要改变向量的哪一部分?简单。在创建线程池之前,将计数器初始化为零。在回调函数中,调用InterlockedIncrement来检索和增加计数器。对于每个提交的工作项,您将得到一个连续的整数。