如何使用具有串行执行顺序的C++线程池

how to use a C++ thread pool with serial execution order

本文关键字:顺序 C++ 线程 执行 何使用      更新时间:2023-10-16

我正在尝试使用一个使用基于优先级的任务的C++线程池。根据优先级(在我的情况下,它是一个比较器对象,而不仅仅是一个值),它需要串行执行,而不是仅仅调度到线程池中的下一个可用线程。

我当前的实现基于以下代码https://github.com/en4bz/ThreadPool我让它像一个普通的线程池一样正常工作(但我没有使用该池的优先级变体,因为我不知道如何指定自定义谓词对象而不是int-如果有人能让ke知道如何传入下面的PriorityLevel,那将是一个真正的优势),所以相反,项目是从std::priority_queue<T>其中T是一个使用下一个PriorityLevel按优先级顺序排列项目的对象。

任务优先级的类型按下面的行进行描述-这些优先级由信道号组成,优先级字符"A-Z"和一个可选的序列号(指定该序列号时,表示任务必须等待所有优先级较高的任务完成,然后才能计划在可用的线程池上执行该任务。我知道如何使用运算符<()严格的弱排序谓词在线程池中排序这些东西,但我不知道如何将其中一些元素放回以支持执行队列。

所以在这个的例子中

(1) channel[1] priority[A] 
(2) channel[1] priority[A] sequenceNum[1]
(3) channel[1] priority[A] sequenceNum[2] 
(4) channel[1] priority[A] sequenceNum[3] 
(5) channel[2] priority[B] 
(6) channel[2] priority[B] sequenceNum[1] 
(7) channel[2] priority[B] sequenceNum[2]

项目1&5将具有最高优先级,并且由于它们没有先决条件-它们将并发运行(如果有可用的线程),但是其他元素将不得不等待,直到它们的先决条件通道/优先级任务完成。

以下是我如何使用线程池(请注意,SLDBJob包含PriorityLevel来处理运算符<()优先级排序。

    std::priority_queue<SLDBJob> priorityJobQueue;
    //... insert a bunch of Jobs 
    // enqueue closure objects in highest to lowest priority so that the 
    // highest ones get started ahead of the lower or equal priority jobs.  
    // these tasks will be executed in priority order using rPoolSize threads
    UtlThreadPool<> threadPool(rPoolSize);
    while (!priorityJobQueue.empty()) {
        const auto& nextJob = priorityJobQueue.top();
        threadPool.enqueue(std::bind(
            &SLDBProtocol::moduleCheckingThreadFn, 
            nextJob, std::ref(gActiveJobs)));
        gActiveJobs.insert(nextJob);
        priorityJobQueue.pop();
    }

这是优先级

class PriorityLevel {
public:
    // default constructor
    explicit PriorityLevel(
        const int32_t& rChannel = -1,
        const char priority = 'Z',
        const boost::optional<int32_t>& rSequenceNum =
            boost::optional<int32_t>())
        : mChannel(rChannel)
        , mPriority(priority)
        , mSequenceNum(rSequenceNum)
    {}
    // copy constructor
    PriorityLevel(const PriorityLevel& rhs)
        : mChannel(rhs.mChannel)
        , mPriority(rhs.mPriority)
        , mSequenceNum(rhs.mSequenceNum)
    {}
    // move constructor
    PriorityLevel(PriorityLevel&& rhs)
        : mChannel(std::move(rhs.mChannel))
        , mPriority(std::move(rhs.mPriority))
        , mSequenceNum(std::move(rhs.mSequenceNum))
    {}
    // non-throwing-swap idiom
    inline void swap(PriorityLevel& rhs) {
        // enable ADL (not necessary in our case, but good practice)
        using std::swap;
        // no need to swap base members - as we are topmost class
        swap(mChannel, rhs.mChannel);
        swap(mPriority, rhs.mPriority);
        swap(mSequenceNum, rhs.mSequenceNum);
    }
    // non-throwing copy-and-swap idiom unified assignment
    PriorityLevel& operator=(PriorityLevel rhs) {
        rhs.swap(*this);
        return *this;
    }
    // equality operator
    inline bool operator==(const PriorityLevel& rhs) const {
        return std::tie(mChannel, mPriority, mSequenceNum) ==
            std::tie(rhs.mChannel, rhs.mPriority, rhs.mSequenceNum);
    }
    // inequality operator
    inline bool operator!=(const PriorityLevel& rhs) const {
        return !(operator==(rhs));
    }
    /**
     * comparator that orders the elements in the priority_queue<p>
     *
     * This is implemented via a lexicographical comparison using a
     * std::tuple<T...> as a helper. Tuple compares work as follows:
     * compares the first elements, if they are equivalent, compares
     * the second elements, if those are equivalent, compares the
     * third elements, and so on. All comparison operators are short
     * - circuited; they do not access tuple elements beyond what is
     * necessary to determine the result of the comparison. note
     * that the presence of the sequence number assigns a lower
     * priority (bigger value 1) contribution to the lexicographical
     * nature of the comparison
     *
     * @param rhs    PriorityLevel to compare against
     *
     * @return true if this is lower priority than rhs
     */
    inline bool operator<(const PriorityLevel& rhs) const {
        auto prtyLen = getPriorityStr().length();
        auto rhsPrtyLen = rhs.getPriorityStr().length();
        auto sequencePrtyVal = mSequenceNum ? mSequenceNum.get() : 0;
        auto rhsSequencePrtyVal = rhs.mSequenceNum ? rhs.mSequenceNum.get() : 0;
        return std::tie(prtyLen, mPriority, mChannel, sequencePrtyVal) >
            std::tie(rhsPrtyLen, rhs.mPriority, rhs.mChannel, rhsSequencePrtyVal);
    }
    // stream friendly struct
    inline friend std::ostream& operator << (std::ostream& os, const PriorityLevel& rValue) {
        std::string sequenceInfo;
        if (rValue.mSequenceNum) {
            sequenceInfo = std::string(", sequence[") +
                std::to_string(rValue.mSequenceNum.get()) + "]";
        }
        os  << "channel[" << rValue.mChannel
            << "], priority[" << rValue.mPriority
            << "]" << sequenceInfo;
        return os;
    }
    // channel getter
    inline int32_t getChannel() const {
        return mChannel;
    }
    // string representation of the priority string
    inline std::string getPriorityStr() const {
        std::stringstream ss;
        ss << mChannel << mPriority;
        if (mSequenceNum) {
            ss << mSequenceNum.get();
        }
        return ss.str();
    }
private:
    // the 3 fields from the ModuleNameTable::szPriorityLevel
    int32_t mChannel;
    // single upper case character A=>'highest priority'
    char mPriority;
    // optional field - when present indicates start order
    boost::optional<int32_t> mSequenceNum;
};

我不会把它们都放在优先级队列中,因为优先级队列在更改优先级方面效率极低。相反,我会将1和5添加到优先级队列中,并将所有剩余的添加到后续任务列表的通道"后续映射"中。当通道1结束时,它会检查通道1在后续映射中是否有任何内容,如果有,则从该列表中弹出第一个项目,并将其添加到priority_queue中。

 using ChannelID = int32_t;
 using PriorityLevel = char;
 struct dispatcher {
      std::priority_queue<SLDBJob> Todo; //starts with items 1 and 5
      std::unordered_map<ChannelID, std::vector<SLDBJob>> FollowupMap;
          //starts with {1, {2,3,4}}, {2, {6, 7, 8}}
          //note the code is actually faster if you store the followups in reverse
      void OnTaskComplete(ChannelID id) {
          auto it = FollowupMap.find(id);
          if (it != FollowupMap.end())
              if (it->empty() == false) {
                  Todo.push_back(std::move(it->front()));
                  it->erase(it->begin());
              }
              if (it->empty() == true)
                  FollowupMap.erase(it);
           }
      }
 };

用法大致如下:

struct reportfinished {
    ChannelID id;
    ~reportfinished() {dispatcher.OnTaskComplete(id);} //check for exceptions? Your call.
};
UtlThreadPool<> threadPool(rPoolSize);
while (!priorityJobQueue.empty()) {
    const auto& nextJob = priorityJobQueue.top();
    auto wrapper = [&gActiveJobs, =]() 
        -> decltype(SLDBProtocol::moduleCheckingThreadFn(nextJob, gActiveJobs))
        {
            reportfinished queue_next{nextJob.channel};
            return SLDBProtocol::moduleCheckingThreadFn(nextJob, gActiveJobs);
        };
    threadPool.enqueue(std::move(wrapper));
    gActiveJobs.insert(nextJob);
    priorityJobQueue.pop();
}