线程等待父线程
Thread Wait For Parent
我正在为我的ubuntu服务器(用于我的多客户端匿名聊天程序)实现一个简单的线程池机制,并且我需要使我的工作线程休眠,直到需要执行作业(以函数指针和参数的形式)。
我现在的系统要被淘汰了。我(工人线程)问经理是否有工作可用,如果没有睡眠5ms。如果有,将作业添加到工作队列并运行该函数。真是浪费时间。
我想做的是创建一个简单的类事件系统。我正在考虑有一个互斥锁向量(每个工作者一个),并在创建时将互斥锁的句柄作为参数传递进来。然后在我的manager类(保存和分发作业)中,每当创建线程时,锁定互斥锁。当需要执行一个作业时,解锁下一个互斥锁,等待它被锁定和解锁,然后重新锁定它。然而,我想知道是否有更好的方法来达到这个目的。
tldr;我的问题是。让线程等待来自管理类的作业的最有效、最有效和最安全的方法是什么?轮询是一种我甚至应该考虑的技术(一次超过1000个客户端),互斥锁是否合适?还是有其他的技巧?
您需要的是条件变量。
所有的工作线程都调用wait()方法来挂起它们。
试题:
#include <pthread.h>
#include <memory>
#include <list>
// Use RAII to do the lock/unlock
struct MutexLock
{
MutexLock(pthread_mutex_t& m) : mutex(m) { pthread_mutex_lock(&mutex); }
~MutexLock() { pthread_mutex_unlock(&mutex); }
private:
pthread_mutex_t& mutex;
};
// The base class of all work we want to do.
struct Job
{
virtual void doWork() = 0;
};
// pthreads is a C library the call back must be a C function.
extern "C" void* threadPoolThreadStart(void*);
// The very basre minimal part of a thread pool
// It does not create the workers. You need to create the work threads
// then make them call workerStart(). I leave that as an exercise for you.
class ThreadPool
{
public:
ThreadPool(unsigned int threadCount=1);
~ThreadPool();
void addWork(std::auto_ptr<Job> job);
private:
friend void* threadPoolThreadStart(void*);
void workerStart();
std::auto_ptr<Job> getJob();
bool finished; // Threads will re-wait while this is true.
pthread_mutex_t mutex; // A lock so that we can sequence accesses.
pthread_cond_t cond; // The condition variable that is used to hold worker threads.
std::list<Job*> workQueue; // A queue of jobs.
std::vector<pthread_t>threads;
};
// Create the thread pool
ThreadPool::ThreadPool(int unsigned threadCount)
: finished(false)
, threads(threadCount)
{
// If we fail creating either pthread object than throw a fit.
if (pthread_mutex_init(&mutex, NULL) != 0)
{ throw int(1);
}
if (pthread_cond_init(&cond, NULL) != 0)
{
pthread_mutex_destroy(&mutex);
throw int(2);
}
for(unsigned int loop=0; loop < threadCount;++loop)
{
if (pthread_create(threads[loop], NULL, threadPoolThreadStart, this) != 0)
{
// One thread failed: clean up
for(unsigned int kill = loop -1; kill < loop /*unsigned will wrap*/;--kill)
{
pthread_kill(threads[kill], 9);
}
throw int(3);
}
}
}
// Cleanup any left overs.
// Note. This does not deal with worker threads.
// You need to add a method to flush all worker threads
// out of this pobject before you let the destructor destroy it.
ThreadPool::~ThreadPool()
{
finished = true;
for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop)
{
// Send enough signals to free all threads.
pthread_cond_signal(&cond);
}
for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop)
{
// Wait for all threads to exit (they will as finished is true and
// we sent enough signals to make sure
// they are running).
void* result;
pthread_join(*loop, &result);
}
// Destroy the pthread objects.
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
// Delete all re-maining jobs.
// Notice how we took ownership of the jobs.
for(std::list<Job*>::const_iterator loop = workQueue.begin(); loop != workQueue.end();++loop)
{
delete *loop;
}
}
// Add a new job to the queue
// Signal the condition variable. This will flush a waiting worker
// otherwise the job will wait for a worker to finish processing its current job.
void ThreadPool::addWork(std::auto_ptr<Job> job)
{
MutexLock lock(mutex);
workQueue.push_back(job.release());
pthread_cond_signal(&cond);
}
// Start a thread.
// Make sure no exceptions escape as that is bad.
void* threadPoolThreadStart(void* data)
{
ThreadPool* pool = reinterpret_cast<ThreadPool*>(workerStart);
try
{
pool->workerStart();
}
catch(...){}
return NULL;
}
// This is the main worker loop.
void ThreadPool::workerStart()
{
while(!finished)
{
std::auto_ptr<Job> job = getJob();
if (job.get() != NULL)
{
job->doWork();
}
}
}
// The workers come here to get a job.
// If there are non in the queue they are suspended waiting on cond
// until a new job is added above.
std::auto_ptr<Job> ThreadPool::getJob()
{
MutexLock lock(mutex);
while((workQueue.empty()) && (!finished))
{
pthread_cond_wait(&cond, &mutex);
// The wait releases the mutex lock and suspends the thread (until a signal).
// When a thread wakes up it is help until it can acquire the mutex so when we
// get here the mutex is again locked.
//
// Note: You must use while() here. This is because of the situation.
// Two workers: Worker A processing job A.
// Worker B suspended on condition variable.
// Parent adds a new job and calls signal.
// This wakes up thread B. But it is possible for Worker A to finish its
// work and lock the mutex before the Worker B is released from the above call.
//
// If that happens then Worker A will see that the queue is not empty
// and grab the work item in the queue and start processing. Worker B will
// then lock the mutext and proceed here. If the above is not a while then
// it would try and remove an item from an empty queue. With a while it sees
// that the queue is empty and re-suspends on the condition variable above.
}
std::auto_ptr<Job> result;
if (!finished)
{ result.reset(workQueue.front());
workQueue.pop_front();
}
return result;
}
通常的实现方法是有一个未完成工作的队列queue
,一个保护队列的互斥锁mutex
和一个等待条件queue_not_empty
。然后,每个工作线程执行以下操作(使用伪api):
while (true) {
Work * work = 0;
mutex.lock();
while ( queue.empty() )
if ( !queue_not_empty.wait( &mutex, timeout ) )
return; // timeout - exit the worker thread
work = queue.front();
queue.pop_front();
mutex.unlock();
work->perform();
}
wait( &mutex, timeout )
调用阻塞,直到满足等待条件或呼叫超时。传递的mutex
在wait()
内部自动解锁,并在从调用返回之前再次锁定,以便为所有参与者提供一致的队列视图。timeout
将被选择为相当大(秒),并将导致线程退出(如果有更多的工作进入,线程池将启动新线程)。
同时,线程池的工作插入函数这样做:
Work * work = ...;
mutex.lock();
queue.push_back( work );
if ( worker.empty() )
start_a_new_worker();
queue_not_empty.wake_one();
mutex.unlock();
与多个消费者(工作线程消费工作请求)的经典生产者-消费者同步。众所周知的技术是有一个信号量,每个工作线程执行down()
,每次有工作请求时执行up()
。然后从互斥锁工作队列中选择请求。因为一个up()
只会唤醒一个down()
,所以互斥锁上的争用实际上是最小的。
或者你可以用条件变量做同样的事情,在每个线程中做等待,当你有工作时唤醒一个。队列本身仍然被互斥锁(无论如何condvar需要一个互斥锁)。
最后我不完全确定,但我实际上认为你可以使用管道作为队列,包括所有同步(工作线程只是试图"读取(sizeof(request))"))。有点粗糙,但导致更少的上下文切换。
由于网络聊天程序可能是I/o绑定而不是cpu绑定,因此实际上并不需要线程。您可以使用Boost之类的工具在单个线程中处理所有I/O。或GLib主循环。这些是基于平台特定函数的可移植抽象,允许程序阻塞等待任何(可能很大)打开的文件或套接字集的活动,然后在活动发生时唤醒并迅速响应。
最简单的方法是semaphores
。信号量是这样工作的:
信号量基本上是一个接受null/正值的变量。进程可以通过两种方式与它交互:增加或减少信号量。
增加信号量给这个神奇的变量加1,仅此而已。在减少计数时,事情变得有趣了:如果计数达到零,并且进程试图再次降低它,因为它不能接受负值,它将阻塞,直到变量上升。
如果多个进程阻塞等待减少信号量值,则每增加一个单位只唤醒一个。
这使得创建一个工作/任务系统非常容易:你的管理进程队列任务,并增加信号量的值来匹配剩余的项目,你的工作进程试图减少计数并不断获取任务。当没有任务可用时,它们将阻塞,并且不消耗cpu时间。当一个进程出现时,只有一个休眠进程会被唤醒。Insta-sync魔法。
不幸的是,至少在Unix世界中,信号量API不是很友好,因为出于某种原因,它处理的是信号量数组而不是单个信号量。但是,您只是一个简单的包装器,而不是一个漂亮的接口!
干杯!
- 为什么我的C#代码在调用回C++COM直到Task时会暂停.等待/线程.加入
- 提升 ASIO - io_service 不要等待连接到线程
- 虚假唤醒是否会解锁所有等待线程,甚至是不相关的线程?
- 一个线程等待多个线程事件
- 如何让线程等待对象完全破坏?(对象也有一个线程)?
- 停止C++ 11 个 std::线程等待 std::condition_variable
- C++线程:等待condition_variable后无法解锁阵列中的互斥锁
- Qt 线程等待来自 GUI 的输入
- C++11线程等待
- 如何让线程等待对象的销毁
- Boost:等待工作线程等待条件变量
- C++11线程等待行为:std::this_Thread::yield()与std::this _Thread::sle
- 如何使线程等待而不轮询
- 使其中一个线程等待的时间尽可能少(几乎为零)
- 当线程等待互斥锁时捕获的信号
- Qt:如何让一个线程等待临时包障,并暂时增加另一个线程的优先级以删除 roadbock?
- C++ Windows MFC 并发:让线程等待,直到达到特定状态
- 线程等待父线程
- 使用异步方法vs线程等待
- c++线程等待时间