如何在互斥锁中赋予特权线程优先级
How to give priority to privileged thread in mutex locking?
首先:我完全是互斥/多线程编程的新手,所以很抱歉提前出现任何错误。。。
我有一个运行多个线程的程序。螺纹(通常每个cpu核心)做了很多计算和";思考;然后有时他们决定打电话给更新某些统计信息的特定(共享)方法。统计信息更新的并发性是通过使用互斥对象来管理的:
stats_mutex.lock();
common_area->update_thread_stats( ... );
stats_mutex.unlock();
现在来谈谈问题。在所有这些线程中,有一个特定的线程几乎需要
实时优先级,因为它是唯一实际操作的线程。
用";几乎实时优先级";我的意思是:
让我们假设线程t0是";享有特权的一个";以及t1…t15是正常的一个。现在发生的是:
- 线程t1获取锁
- 线程t2、t3、t0调用lock()方法并等待它成功
- 线程t1调用unlock()
- 线程t2、t3、t0中的一个(据我所知,是随机的)成功获取锁,其他的继续等待
我需要的是:
- 线程t1获取锁
- 线程t2、t3、t0调用lock()方法并等待它成功
- 线程t1调用unlock()
- 线程t0获取锁,因为它具有特权
那么,做这件事最好(可能最简单)的方法是什么?
我想的是有一个名为"privileged_needs_lock";。
但我想我需要另一个互斥来管理对这个变量的访问。。。我没有知道这是不是正确的方式。。。
附加信息:
- 我的线程使用C++11(从gcc 4.6.3开始)
- 代码需要在Linux和Windows上运行(但目前仅在Linux上测试)
- 锁定机制的性能不是问题(我的性能问题是在内部线程计算中,线程数量总是很低,每个cpu核心最多一两个)
任何想法都值得赞赏。感谢
以下解决方案有效(三种互斥方式):
#include <thread>
#include <iostream>
#include <mutex>
#include "unistd.h"
std::mutex M;
std::mutex N;
std::mutex L;
void lowpriolock(){
L.lock();
N.lock();
M.lock();
N.unlock();
}
void lowpriounlock(){
M.unlock();
L.unlock();
}
void highpriolock(){
N.lock();
M.lock();
N.unlock();
}
void highpriounlock(){
M.unlock();
}
void hpt(const char* s){
using namespace std;
//cout << "hpt trying to get lock here" << endl;
highpriolock();
cout << s << endl;
sleep(2);
highpriounlock();
}
void lpt(const char* s){
using namespace std;
//cout << "lpt trying to get lock here" << endl;
lowpriolock();
cout << s << endl;
sleep(2);
lowpriounlock();
}
int main(){
std::thread t0(lpt,"low prio t0 working here");
std::thread t1(lpt,"low prio t1 working here");
std::thread t2(hpt,"high prio t2 working here");
std::thread t3(lpt,"low prio t3 working here");
std::thread t4(lpt,"low prio t4 working here");
std::thread t5(lpt,"low prio t5 working here");
std::thread t6(lpt,"low prio t6 working here");
std::thread t7(lpt,"low prio t7 working here");
//std::cout << "All threads created" << std::endl;
t0.join();
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
return 0;
}
按照建议尝试了以下解决方案,但它不起作用(使用"g++-std=c++0x-o测试.cpp-lpthread"编译):
#include <thread>
#include <mutex>
#include "time.h"
#include "pthread.h"
std::mutex l;
void waiter(){
l.lock();
printf("Here i am, waiter startsn");
sleep(2);
printf("Here i am, waiter endsn");
l.unlock();
}
void privileged(int id){
usleep(200000);
l.lock();
usleep(200000);
printf("Here i am, privileged (%d)n",id);
l.unlock();
}
void normal(int id){
usleep(200000);
l.lock();
usleep(200000);
printf("Here i am, normal (%d)n",id);
l.unlock();
}
int main(){
std::thread tw(waiter);
std::thread t1(normal,1);
std::thread t0(privileged,0);
std::thread t2(normal,2);
sched_param sch;
int policy;
pthread_getschedparam(t0.native_handle(), &policy, &sch);
sch.sched_priority = -19;
pthread_setschedparam(t0.native_handle(), SCHED_FIFO, &sch);
pthread_getschedparam(t1.native_handle(), &policy, &sch);
sch.sched_priority = 18;
pthread_setschedparam(t1.native_handle(), SCHED_FIFO, &sch);
pthread_getschedparam(t2.native_handle(), &policy, &sch);
sch.sched_priority = 18;
pthread_setschedparam(t2.native_handle(), SCHED_FIFO, &sch);
tw.join();
t1.join();
t0.join();
t2.join();
return 0;
}
我可以想到三种只使用线程原语的方法:
三重互斥
这里有三个互斥:
- 数据互斥('M')
- 访问互斥('N')旁边,以及
- 低优先级访问互斥
访问模式为:
- 低优先级线程:锁定L,锁定N,锁定M,解锁N,{做事情},解锁M,解锁L
- 高优先级线程:锁定N,锁定M,解锁N,{做事情},解锁M
这样就可以保护对数据的访问,并且高优先级线程可以在访问数据时领先于低优先级线程
互斥,条件变量,原子标志
实现这一点的基本方法是使用条件变量和原子:
- Mutex M
- Condvar C
- 原子布尔hpt_waiting
数据访问模式:
- 低优先级线程:锁定M,同时(hpt_waiting)在M上等待C,{做事情},广播C,解锁M
- 高优先级线程:hpt_waiting:=true,锁定M,hpt_waiting:=false,{做事情},广播C,解锁M
互斥,条件变量,两个非原子标志
或者,您可以使用两个非原子布尔和一个condvar;在这种技术中,互斥/condvar保护标志,数据不是由互斥保护的,而是由一个标志保护的:
-
Mutex M;
-
Condvar C;
-
布尔数据字段,hpt_waiting;
-
低优先级线程:锁定M,而(hpt_waiting或data_held)在M上等待C,data_held:=true,解锁M,{do-stuff},锁定M,data_hield:=false,广播C,解锁M
-
高优先级线程:锁定M,hpt_waiting:=true,而(data_held)在M上等待C,data_held:=true,解锁M,{do-stuff},锁定M,data_hield:=false,hpt_waiting:=false,广播C,解锁M
将请求线程放在"优先级队列"上。特权线程可以在数据空闲时首先访问数据。
一种方法是使用ConcurrentQueues数组[privilegeLevel]、一个锁和一些事件。
任何想要获取数据的线程都会进入锁。如果数据是空闲的(boolean),它将获取数据对象并退出锁。如果数据正由另一个线程使用,则请求线程根据其权限级别将事件推送到其中一个并发队列中,退出锁并等待该事件。
当线程想要释放其对数据对象的所有权时,它会获得锁并从最高权限端向下迭代ConcurrentQueues数组,以查找事件(即队列计数>0)。如果它找到一个,它会向它发出信号并退出锁,如果没有,它会设置"dataFree"布尔值并退出锁。
当等待访问数据的事件的线程准备就绪时,它可以访问数据对象。
我认为这应该行得通。请其他开发者检查一下这个设计,看看你是否能想到任何种族等?去捷克旅行后,我仍然有点"热情好客"。
编辑-可能甚至不需要并发队列,因为所有队列之间都有显式锁定。任何旧的队列都可以。
#include <thread>
#include <mutex>
#include <condition_variable>
#include <cassert>
class priority_mutex {
std::condition_variable cv_;
std::mutex gate_;
bool locked_;
std::thread::id pr_tid_; // priority thread
public:
priority_mutex() : locked_(false) {}
~priority_mutex() { assert(!locked_); }
priority_mutex(priority_mutex&) = delete;
priority_mutex operator=(priority_mutex&) = delete;
void lock(bool privileged = false) {
const std::thread::id tid = std::this_thread::get_id();
std::unique_lock<decltype(gate_)> lk(gate_);
if (privileged)
pr_tid_ = tid;
cv_.wait(lk, [&]{
return !locked_ && (pr_tid_ == std::thread::id() || pr_tid_ == tid);
});
locked_ = true;
}
void unlock() {
std::lock_guard<decltype(gate_)> lk(gate_);
if (pr_tid_ == std::this_thread::get_id())
pr_tid_ = std::thread::id();
locked_ = false;
cv_.notify_all();
}
};
注意:此priority_mutex
提供了不公平的线程调度。如果特权线程频繁获取锁,那么其他非特权线程可能几乎不会被调度。
用法示例:
#include <mutex>
priority_mutex mtx;
void privileged_thread()
{
//...
{
mtx.lock(true); // acquire 'priority lock'
std::unique_lock<decltype(mtx)> lk(mtx, std::adopt_lock);
// update shared state, etc.
}
//...
}
void normal_thread()
{
//...
{
std::unique_lock<decltype(mtx)> lk(mtx); // acquire 'normal lock'
// do something
}
//...
}
在linux上,您可以检查此man:phread_setschedparam以及man-shed_setscheduler
pthread_setschedparam(pthread_t线程,const struct sched_param*param);
对于c++2011,也请检查此项:http://msdn.microsoft.com/en-us/library/system.threading.thread.priority.aspx#Y78
pthreads具有线程优先级:
pthread_setschedprio( (pthread_t*)(&mThreadId), wpri );
如果多个线程在锁中等待睡眠,则调度程序将首先唤醒优先级最高的线程。
尝试以下操作。您可以使类成为线程安全的单例,甚至可以使它成为函子。
#include <pthread.h>
#include <semaphore.h>
#include <map>
class ThreadPrioFun
{
typedef std::multimap<int, sem_t*> priomap_t;
public:
ThreadPrioFun()
{
pthread_mutex_init(&mtx, NULL);
}
~ThreadPrioFun()
{
pthread_mutex_destroy(&mtx);
}
void fun(int prio, sem_t* pSem)
{
pthread_mutex_lock(&mtx);
bool bWait = !(pm.empty());
priomap_t::iterator it = pm.insert(std::pair<int, sem_t*>(prio, pSem) );
pthread_mutex_unlock(&mtx);
if( bWait ) sem_wait(pSem);
// do the actual job
// ....
//
pthread_mutex_lock(&mtx);
// done, remove yourself
pm.erase(it);
if( ! pm.empty() )
{
// let next guy run:
sem_post((pm.begin()->second));
}
pthread_mutex_unlock(&mtx);
}
private:
pthread_mutex_t mtx;
priomap_t pm;
};
由于线程优先级不适合您:
创建2个互斥锁,一个常规锁和一个优先级锁。
常规线程必须首先锁定普通锁,然后锁定优先级锁。优先级线程只需要锁定优先级锁:
Mutex mLock;
Mutex mPriLock;
doNormal()
{
mLock.lock();
pthread_yield();
doPriority();
mLock.unlock();
}
doPriority()
{
mPriLock.lock();
doStuff();
mPriLock.unlock();
}
稍微修改了ecatmur答案,添加了第四个互斥体来同时处理多个高优先级线程(注意,在我最初的问题中,这不是必需的):
#include <thread>
#include <iostream>
#include "unistd.h"
std::mutex M; //data access mutex
std::mutex N; // 'next to access' mutex
std::mutex L; //low priority access mutex
std::mutex H; //hptwaiting int access mutex
int hptwaiting=0;
void lowpriolock(){
L.lock();
while(hptwaiting>0){
N.lock();
N.unlock();
}
N.lock();
M.lock();
N.unlock();
}
void lowpriounlock(){
M.unlock();
L.unlock();
}
void highpriolock(){
H.lock();
hptwaiting++;
H.unlock();
N.lock();
M.lock();
N.unlock();
}
void highpriounlock(){
M.unlock();
H.lock();
hptwaiting--;
H.unlock();
}
void hpt(const char* s){
using namespace std;
//cout << "hpt trying to get lock here" << endl;
highpriolock();
cout << s << endl;
usleep(30000);
highpriounlock();
}
void lpt(const char* s){
using namespace std;
//cout << "lpt trying to get lock here" << endl;
lowpriolock();
cout << s << endl;
usleep(30000);
lowpriounlock();
}
int main(){
std::thread t0(lpt,"low prio t0 working here");
std::thread t1(lpt,"low prio t1 working here");
std::thread t2(hpt,"high prio t2 working here");
std::thread t3(lpt,"low prio t3 working here");
std::thread t4(lpt,"low prio t4 working here");
std::thread t5(lpt,"low prio t5 working here");
std::thread t6(hpt,"high prio t6 working here");
std::thread t7(lpt,"low prio t7 working here");
std::thread t8(hpt,"high prio t8 working here");
std::thread t9(lpt,"low prio t9 working here");
std::thread t10(lpt,"low prio t10 working here");
std::thread t11(lpt,"low prio t11 working here");
std::thread t12(hpt,"high prio t12 working here");
std::thread t13(lpt,"low prio t13 working here");
//std::cout << "All threads created" << std::endl;
t0.join();
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
t8.join();
t9.join();
t10.join();
t11.join();
t12.join();
t13.join();
return 0;
}
你觉得怎么样?可以吗?信号量确实可以更好地处理这类事情,但互斥对我来说更容易管理
- 从不同线程使用int64的不同字节安全吗
- 删除一个线程上有数百万个字符串的大型哈希映射会影响另一个线程的性能
- 在C++中使用cURL和多线程
- 为什么我的C#代码在调用回C++COM直到Task时会暂停.等待/线程.加入
- 在cuda线程之间共享大量常量数据
- 如何将元素添加到数组的线程安全函数?
- 线程,如果else语句,都是错误的上下文切换后,会发生什么
- C++Boost Asio Pool线程,带有lambda函数和传递引用变量
- Qt C++静态thread_local QNetworkAccessManager是线程应用程序的好选择吗
- 异常属于C++中的线程还是进程
- C++中的线程安全删除
- C++使用params创建线程函数会导致转换错误
- 类与私有变量的其他类之间的线程安全性
- CoInitialize()在单独的线程上崩溃而不返回
- c++中的线程池
- 线程之间的布尔停止信号
- 为什么std::async使用同一个线程运行函数
- 用于矢量处理的多个线程
- 根据线程优先级/特权授予对资源的访问权限
- 如何在互斥锁中赋予特权线程优先级