如何在两组线程之间正确来回传输控制权
How to properly transfer control between two sets of threads back and forth
我想只用C++ 11的标准库实现一个线程池。我想公开的接口是允许我的主线程一次提交许多作业,并等到所有线程完成才能继续。这是我第一次显式处理线程,因此不可避免地会遇到一些死锁问题。这是我的代码:
class CrashQueue {
private:
std::vector<std::thread> workers;
std::queue<void*> payloads;
std::function<void(void*)> function;
std::mutex taskFetchingMutex;
long aliveWorkers;
std::condition_variable alarmClock;
std::condition_variable sleepClock;
std::mutex sleepClockMutex;
bool running = true;
public:
CrashQueue(std::size_t threadCount = std::thread::hardware_concurrency()) {
for (std::size_t i = 0; i < threadCount; i ++) {
workers.emplace_back([this]() -> void {
while (running) {
void* payload;
{
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (payloads.empty()) {
aliveWorkers --;
if (aliveWorkers == 0)
sleepClock.notify_one();
alarmClock.wait(lock);
continue;
}
payload = payloads.front();
payloads.pop();
}
function(payload);
}
});
}
}
~CrashQueue() {
running = false;
alarmClock.notify_all();
for (auto& worker : workers)
worker.join();
}
void run() {
this->aliveWorkers = workers.size();
alarmClock.notify_all();
std::unique_lock<std::mutex> lock(sleepClockMutex);
sleepClock.wait(lock);
}
void commit(std::function<void(void*)>&& function, std::queue<void*>&& payloads) {
this->function = std::move(function);
this->payloads = std::move(payloads);
}
};
我怀疑问题出在工作线程中执行的构造函数的 lambda 表达式中:
if (payloads.empty()) {
aliveWorkers --;
if (aliveWorkers == 0)
sleepClock.notify_one();
alarmClock.wait(lock);
continue;
}
可能是最后一个工作线程唤醒主线程并在主线程唤醒所有其他线程后睡眠的情况。尽管如此,这似乎不太可能,但是每次我不处于调试模式时都会发生死锁。有什么提示吗?
这个问题在某种程度上与我在这里使用两个互斥体的事实有关。按如下方式重写run
使其正常工作:
void run() {
this->aliveWorkers = workers.size();
alarmClock.notify_all();
while (true) {
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (aliveWorkers.load() == 0)
break;
sleepClock.wait(lock);
}
}
但是,我无法可视化图片以了解原始代码可能失败的原因。任何解释的答案仍然是需要的。
编辑:我看似正确的代码的完整来源:
#include <iostream>
#include <atomic>
#include <vector>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <random>
#include <algorithm>
#include <tuple>
#include <queue>
class CrashQueue {
private:
std::vector<std::thread> workers;
std::queue<void*> payloads;
std::function<void(void*)> function;
std::mutex taskFetchingMutex;
std::atomic<long> aliveWorkers;
std::condition_variable alarmClock;
std::condition_variable sleepClock;
bool running = true;
public:
CrashQueue(std::size_t threadCount = std::thread::hardware_concurrency())
: aliveWorkers(threadCount) {
for (std::size_t i = 0; i < threadCount; i ++) {
workers.emplace_back([this]() -> void {
while (running) {
void* payload;
{
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (payloads.empty()) {
aliveWorkers.fetch_sub(1);
sleepClock.notify_one();
alarmClock.wait(lock);
continue;
}
payload = payloads.front();
payloads.pop();
}
function(payload);
}
});
}
// Make sure all workers finished running.
while (aliveWorkers.load() > 0);
std::unique_lock<std::mutex> lock(taskFetchingMutex);
}
~CrashQueue() {
running = false;
alarmClock.notify_all();
for (auto& worker : workers)
worker.join();
}
void run() {
this->aliveWorkers = workers.size();
alarmClock.notify_all();
while (true) {
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (aliveWorkers.load() == 0)
break;
sleepClock.wait(lock);
}
}
void commit(std::function<void(void*)>&& function, std::queue<void*>&& payloads) {
this->function = std::move(function);
this->payloads = std::move(payloads);
}
};
相关文章:
- C++中std::resize(n)和std::shrink_to_fit之间的区别
- int(c) 和 c-'0' 之间的区别。C++
- 在cuda线程之间共享大量常量数据
- 在c代码之间共享数据的最佳方式
- Mix_Init和Mix_OpenAudio SDL之间的区别是什么
- C++ 使用 assign 函数的字符串与直接使用 '=' 更改值的字符串之间的区别
- VSOMEIP-2个设备之间的通信(TCP/UDP)不工作
- std::atomic和std::condition_variable wait,notify_*方法之间的区别
- 大小相等但成员数量不同的结构之间的性能差异
- 类与私有变量的其他类之间的线程安全性
- 如何在cpp文件之间切换窗口?在Qt中
- 线程之间的布尔停止信号
- 我是C++编程的新手,这些代码之间有什么区别,我应该使用哪一个
- 在 const 函数中通过引用和指针返回之间的区别
- 我想知道长双倍和双倍之间的区别
- 如何防止clang格式在流运算符调用之间添加换行符<<
- 在两台机器之间进行时间戳的最佳c++chrono函数是什么
- 使用.find函数在c++中查找字符和另一个字符之间的大小
- 如何在两组线程之间正确来回传输控制权
- 在C++和 Python 代码之间转移控制权