c++ 线程和承诺

c++ Threads and promises

本文关键字:承诺 线程 c++      更新时间:2023-10-16

我有一个线程安全队列,它预填充了主线程的工作;当我启动工作线程时,它们会从工作队列中弹出一个任务,但也可以将新任务推送回工作队列。代码简要介绍:

auto work_queue = safe_queue{};
static void handle_task(T task) {
// process the task
// might push a new task to a work queue using work_queue.push()
}
int main() {
//some work is done to prepopulate work_queue
auto handle_work = [](){
while (!work_queue.empty) {
T task = work_queue.pop();
handle_task(task);
}
};
std::vector<std::thread> threads;
for (int i = 0; i < NUM_OF_THREADS; i++) {
threads.push_back(std::thread(processing));
}
std::for_each(threads.begin(), threads.end(), [](std::thread &t) {
t.join();
}
}

我知道这段代码无法正常工作,因为在某些情况下,队列可能是空的,而某些工作线程正在处理工作,而另一个线程进入,找不到工作并退出(尽管线程处理任务可能会将新工作推回队列(。我的问题是,如何防止线程像这样过早退出?使用std::promise<void>允许线程与它们可能仍在工作的其他线程通信,这是否有效?如果是这样,这将如何与多个线程一起工作(我是 c++ 的新手,只使用单线程的承诺(?

我不认为std::promise<void>可以在这里使用,因为它更像是一次性的东西。结果在未来设定后,它不能取消设置,因此我们不能为同一个承诺等待两次。

可以执行以下操作(计数器必须使线程安全,但我现在懒得这样做(:

int finished_threads = 0;
auto handle_work = [&finished_threads](){
bool this_finished = false;
while (finished_threads < NUM_OF_THREADS) {
while (!work_queue.empty) {
if(this_finished) {
this_finished = false;
--finished_threads;  // evil
}
T task = work_queue.pop();
handle_task(task);
}
if(!this_finished) {
this_finished = true;
++finished_threads; // evil
}
}
};

这应该仅在所有线程完成后退出(如:它们不再处理任务并且队列中没有(。然后,队列中将不再放置任何新任务。局部变量最大限度地减少了对共享内存的访问。

请注意,我在多线程编程方面的经验非常有限。

我用std::condition_variable尝试了一些完全不同的东西:

#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <optional>
#include <algorithm>
constexpr int NUM_OF_THREADS = 5;
std::condition_variable input_cv;
std::condition_variable callback_cv;
std::optional<int> data {};
std::mutex m;
std::mutex callback_mutex;
int finished_threads = 0;
static void handle_task(int i)
{
}
struct worker 
{
const int num;
worker () = delete;
void operator()() {
while(true){
std::unique_lock<std::mutex> lk(m);
++finished_threads; // protected by m
callback_cv.notify_one(); // wake up main thread if it sleeps
input_cv.wait(lk);
if(!data)
return;
--finished_threads; // protected by m
int local_data = *data;
lk.unlock();
handle_task(local_data);
}
}
};

struct safe_queue // dummy
{
int pop () const { return 0; }
bool empty() const {return true;}
};
void main_thread ()
{
std::vector<std::thread> workers;
safe_queue work_queue;
for(int i = 0; i < NUM_OF_THREADS; ++i)
{
workers.emplace_back(worker{i});
}
do
{
do
{
{
std::lock_guard<std::mutex> guard(m);
data.emplace(work_queue.pop());
}
input_cv.notify_one();
} while(!work_queue.empty() && finished_threads > 0)
// If no thread has finished, we can wait for the next one to finish.
std::unique_lock<std::mutex> lk(callback_mutex);
callback_cv.wait(lk); // We wait on some thread to have finished
}while(finished_threads < NUM_OF_THREADS && !work_queue.empty()); // In either case, there remains something to do.
data = {};
input_cv.notify_all();
std::for_each(begin(workers), end(workers), [](std::thread &t) { t.join();});
}

int main()
{
std::thread t(main_thread);
t.join();
}

不确定它是否更好,但它肯定更复杂^^。