如何在两组线程之间正确来回传输控制权

How to properly transfer control between two sets of threads back and forth

本文关键字:之间 控制权 传输 线程 两组      更新时间:2023-10-16

我想只用C++ 11的标准库实现一个线程池。我想公开的接口是允许我的主线程一次提交许多作业,并等到所有线程完成才能继续。这是我第一次显式处理线程,因此不可避免地会遇到一些死锁问题。这是我的代码:

class CrashQueue {
private:
std::vector<std::thread> workers;
std::queue<void*> payloads;
std::function<void(void*)> function;
std::mutex taskFetchingMutex;
long aliveWorkers;
std::condition_variable alarmClock;
std::condition_variable sleepClock;
std::mutex sleepClockMutex;
bool running = true;
public:
CrashQueue(std::size_t threadCount = std::thread::hardware_concurrency()) {
for (std::size_t i = 0; i < threadCount; i ++) {
workers.emplace_back([this]() -> void {
while (running) {
void* payload;
{
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (payloads.empty()) {
aliveWorkers --;
if (aliveWorkers == 0)
sleepClock.notify_one();
alarmClock.wait(lock);
continue;
}
payload = payloads.front();
payloads.pop();
}
function(payload);
}
});
}
}
~CrashQueue() {
running = false;
alarmClock.notify_all();
for (auto& worker : workers)
worker.join();
}
void run() {
this->aliveWorkers = workers.size();
alarmClock.notify_all();
std::unique_lock<std::mutex> lock(sleepClockMutex);
sleepClock.wait(lock);
}
void commit(std::function<void(void*)>&& function, std::queue<void*>&& payloads) {
this->function = std::move(function);
this->payloads = std::move(payloads);
}
};

我怀疑问题出在工作线程中执行的构造函数的 lambda 表达式中:

if (payloads.empty()) {
aliveWorkers --;
if (aliveWorkers == 0)
sleepClock.notify_one();
alarmClock.wait(lock);
continue;
}

可能是最后一个工作线程唤醒主线程并在主线程唤醒所有其他线程睡眠的情况。尽管如此,这似乎不太可能,但是每次我不处于调试模式时都会发生死锁。有什么提示吗?

这个问题在某种程度上与我在这里使用两个互斥体的事实有关。按如下方式重写run使其正常工作:

void run() {
this->aliveWorkers = workers.size();
alarmClock.notify_all();
while (true) {
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (aliveWorkers.load() == 0)
break;
sleepClock.wait(lock);
}
}

但是,我无法可视化图片以了解原始代码可能失败的原因。任何解释的答案仍然是需要的。

编辑:我看似正确的代码的完整来源:

#include <iostream>
#include <atomic>
#include <vector>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <random>
#include <algorithm>
#include <tuple>
#include <queue>
class CrashQueue {
private:
std::vector<std::thread> workers;
std::queue<void*> payloads;
std::function<void(void*)> function;
std::mutex taskFetchingMutex;
std::atomic<long> aliveWorkers;
std::condition_variable alarmClock;
std::condition_variable sleepClock;
bool running = true;
public:
CrashQueue(std::size_t threadCount = std::thread::hardware_concurrency())
: aliveWorkers(threadCount) {
for (std::size_t i = 0; i < threadCount; i ++) {
workers.emplace_back([this]() -> void {
while (running) {
void* payload;
{
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (payloads.empty()) {
aliveWorkers.fetch_sub(1);
sleepClock.notify_one();
alarmClock.wait(lock);
continue;
}
payload = payloads.front();
payloads.pop();
}
function(payload);
}
});
}
// Make sure all workers finished running.
while (aliveWorkers.load() > 0);
std::unique_lock<std::mutex> lock(taskFetchingMutex);
}
~CrashQueue() {
running = false;
alarmClock.notify_all();
for (auto& worker : workers)
worker.join();
}
void run() {
this->aliveWorkers = workers.size();
alarmClock.notify_all();
while (true) {
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (aliveWorkers.load() == 0)
break;
sleepClock.wait(lock);
}
}
void commit(std::function<void(void*)>&& function, std::queue<void*>&& payloads) {
this->function = std::move(function);
this->payloads = std::move(payloads);
}
};