如何使用具有串行执行顺序的C++线程池
how to use a C++ thread pool with serial execution order
我正在尝试使用一个使用基于优先级的任务的C++线程池。根据优先级(在我的情况下,它是一个比较器对象,而不仅仅是一个值),它需要串行执行,而不是仅仅调度到线程池中的下一个可用线程。
我当前的实现基于以下代码https://github.com/en4bz/ThreadPool我让它像一个普通的线程池一样正常工作(但我没有使用该池的优先级变体,因为我不知道如何指定自定义谓词对象而不是int-如果有人能让ke知道如何传入下面的PriorityLevel,那将是一个真正的优势),所以相反,项目是从std::priority_queue<T>其中T是一个使用下一个PriorityLevel按优先级顺序排列项目的对象。
任务优先级的类型按下面的行进行描述-这些优先级由信道号组成,优先级字符"A-Z"和一个可选的序列号(指定该序列号时,表示任务必须等待所有优先级较高的任务完成,然后才能计划在可用的线程池上执行该任务。我知道如何使用运算符<()严格的弱排序谓词在线程池中排序这些东西,但我不知道如何将其中一些元素放回以支持执行队列。
所以在这个的例子中
(1) channel[1] priority[A]
(2) channel[1] priority[A] sequenceNum[1]
(3) channel[1] priority[A] sequenceNum[2]
(4) channel[1] priority[A] sequenceNum[3]
(5) channel[2] priority[B]
(6) channel[2] priority[B] sequenceNum[1]
(7) channel[2] priority[B] sequenceNum[2]
项目1&5将具有最高优先级,并且由于它们没有先决条件-它们将并发运行(如果有可用的线程),但是其他元素将不得不等待,直到它们的先决条件通道/优先级任务完成。
以下是我如何使用线程池(请注意,SLDBJob包含PriorityLevel来处理运算符<()优先级排序。
std::priority_queue<SLDBJob> priorityJobQueue;
//... insert a bunch of Jobs
// enqueue closure objects in highest to lowest priority so that the
// highest ones get started ahead of the lower or equal priority jobs.
// these tasks will be executed in priority order using rPoolSize threads
UtlThreadPool<> threadPool(rPoolSize);
while (!priorityJobQueue.empty()) {
const auto& nextJob = priorityJobQueue.top();
threadPool.enqueue(std::bind(
&SLDBProtocol::moduleCheckingThreadFn,
nextJob, std::ref(gActiveJobs)));
gActiveJobs.insert(nextJob);
priorityJobQueue.pop();
}
这是优先级
class PriorityLevel {
public:
// default constructor
explicit PriorityLevel(
const int32_t& rChannel = -1,
const char priority = 'Z',
const boost::optional<int32_t>& rSequenceNum =
boost::optional<int32_t>())
: mChannel(rChannel)
, mPriority(priority)
, mSequenceNum(rSequenceNum)
{}
// copy constructor
PriorityLevel(const PriorityLevel& rhs)
: mChannel(rhs.mChannel)
, mPriority(rhs.mPriority)
, mSequenceNum(rhs.mSequenceNum)
{}
// move constructor
PriorityLevel(PriorityLevel&& rhs)
: mChannel(std::move(rhs.mChannel))
, mPriority(std::move(rhs.mPriority))
, mSequenceNum(std::move(rhs.mSequenceNum))
{}
// non-throwing-swap idiom
inline void swap(PriorityLevel& rhs) {
// enable ADL (not necessary in our case, but good practice)
using std::swap;
// no need to swap base members - as we are topmost class
swap(mChannel, rhs.mChannel);
swap(mPriority, rhs.mPriority);
swap(mSequenceNum, rhs.mSequenceNum);
}
// non-throwing copy-and-swap idiom unified assignment
PriorityLevel& operator=(PriorityLevel rhs) {
rhs.swap(*this);
return *this;
}
// equality operator
inline bool operator==(const PriorityLevel& rhs) const {
return std::tie(mChannel, mPriority, mSequenceNum) ==
std::tie(rhs.mChannel, rhs.mPriority, rhs.mSequenceNum);
}
// inequality operator
inline bool operator!=(const PriorityLevel& rhs) const {
return !(operator==(rhs));
}
/**
* comparator that orders the elements in the priority_queue<p>
*
* This is implemented via a lexicographical comparison using a
* std::tuple<T...> as a helper. Tuple compares work as follows:
* compares the first elements, if they are equivalent, compares
* the second elements, if those are equivalent, compares the
* third elements, and so on. All comparison operators are short
* - circuited; they do not access tuple elements beyond what is
* necessary to determine the result of the comparison. note
* that the presence of the sequence number assigns a lower
* priority (bigger value 1) contribution to the lexicographical
* nature of the comparison
*
* @param rhs PriorityLevel to compare against
*
* @return true if this is lower priority than rhs
*/
inline bool operator<(const PriorityLevel& rhs) const {
auto prtyLen = getPriorityStr().length();
auto rhsPrtyLen = rhs.getPriorityStr().length();
auto sequencePrtyVal = mSequenceNum ? mSequenceNum.get() : 0;
auto rhsSequencePrtyVal = rhs.mSequenceNum ? rhs.mSequenceNum.get() : 0;
return std::tie(prtyLen, mPriority, mChannel, sequencePrtyVal) >
std::tie(rhsPrtyLen, rhs.mPriority, rhs.mChannel, rhsSequencePrtyVal);
}
// stream friendly struct
inline friend std::ostream& operator << (std::ostream& os, const PriorityLevel& rValue) {
std::string sequenceInfo;
if (rValue.mSequenceNum) {
sequenceInfo = std::string(", sequence[") +
std::to_string(rValue.mSequenceNum.get()) + "]";
}
os << "channel[" << rValue.mChannel
<< "], priority[" << rValue.mPriority
<< "]" << sequenceInfo;
return os;
}
// channel getter
inline int32_t getChannel() const {
return mChannel;
}
// string representation of the priority string
inline std::string getPriorityStr() const {
std::stringstream ss;
ss << mChannel << mPriority;
if (mSequenceNum) {
ss << mSequenceNum.get();
}
return ss.str();
}
private:
// the 3 fields from the ModuleNameTable::szPriorityLevel
int32_t mChannel;
// single upper case character A=>'highest priority'
char mPriority;
// optional field - when present indicates start order
boost::optional<int32_t> mSequenceNum;
};
我不会把它们都放在优先级队列中,因为优先级队列在更改优先级方面效率极低。相反,我会将1和5添加到优先级队列中,并将所有剩余的添加到后续任务列表的通道"后续映射"中。当通道1结束时,它会检查通道1在后续映射中是否有任何内容,如果有,则从该列表中弹出第一个项目,并将其添加到priority_queue中。
using ChannelID = int32_t;
using PriorityLevel = char;
struct dispatcher {
std::priority_queue<SLDBJob> Todo; //starts with items 1 and 5
std::unordered_map<ChannelID, std::vector<SLDBJob>> FollowupMap;
//starts with {1, {2,3,4}}, {2, {6, 7, 8}}
//note the code is actually faster if you store the followups in reverse
void OnTaskComplete(ChannelID id) {
auto it = FollowupMap.find(id);
if (it != FollowupMap.end())
if (it->empty() == false) {
Todo.push_back(std::move(it->front()));
it->erase(it->begin());
}
if (it->empty() == true)
FollowupMap.erase(it);
}
}
};
用法大致如下:
struct reportfinished {
ChannelID id;
~reportfinished() {dispatcher.OnTaskComplete(id);} //check for exceptions? Your call.
};
UtlThreadPool<> threadPool(rPoolSize);
while (!priorityJobQueue.empty()) {
const auto& nextJob = priorityJobQueue.top();
auto wrapper = [&gActiveJobs, =]()
-> decltype(SLDBProtocol::moduleCheckingThreadFn(nextJob, gActiveJobs))
{
reportfinished queue_next{nextJob.channel};
return SLDBProtocol::moduleCheckingThreadFn(nextJob, gActiveJobs);
};
threadPool.enqueue(std::move(wrapper));
gActiveJobs.insert(nextJob);
priorityJobQueue.pop();
}
- CMake-按正确顺序将项目与C运行时对象文件链接
- 函数调用中参数的顺序重要吗
- 为什么不;名字在地图上是按顺序排列的吗
- 将Integer转换为4字节的unsined字符矢量(按大端字节顺序)
- 数到第n个楼梯的路(顺序无关紧要)
- 优先顺序:智能指针和类析构函数
- 在循环中按顺序遍历成员变量
- 独立读取-修改-写入顺序
- QML按钮点击功能执行顺序
- C++中数据类型修饰符的顺序
- 当比特(而不是字节)的顺序至关重要时的持久性
- C++从其他 constexpr 创建 lambda 不能按顺序执行 Constexpr
- 通过选项卡的文本设置QTabWidget顺序
- c++11评估顺序(未定义的行为)
- 如何在C++中递归地按相反顺序打印集合
- 给定顺序中的事件处理
- 具有包含其他对象的类的对象创建顺序
- 如何通过替换顺序代码的while循环来添加OpenMP for循环
- 遍历顺序由 std::文件系统directory_iterator给出
- 检查 2 棵树是否具有相同的顺序