c++ 线程和承诺
c++ Threads and promises
我有一个线程安全队列,它预填充了主线程的工作;当我启动工作线程时,它们会从工作队列中弹出一个任务,但也可以将新任务推送回工作队列。代码简要介绍:
auto work_queue = safe_queue{};
static void handle_task(T task) {
// process the task
// might push a new task to a work queue using work_queue.push()
}
int main() {
//some work is done to prepopulate work_queue
auto handle_work = [](){
while (!work_queue.empty) {
T task = work_queue.pop();
handle_task(task);
}
};
std::vector<std::thread> threads;
for (int i = 0; i < NUM_OF_THREADS; i++) {
threads.push_back(std::thread(processing));
}
std::for_each(threads.begin(), threads.end(), [](std::thread &t) {
t.join();
}
}
我知道这段代码无法正常工作,因为在某些情况下,队列可能是空的,而某些工作线程正在处理工作,而另一个线程进入,找不到工作并退出(尽管线程处理任务可能会将新工作推回队列(。我的问题是,如何防止线程像这样过早退出?使用std::promise<void>
允许线程与它们可能仍在工作的其他线程通信,这是否有效?如果是这样,这将如何与多个线程一起工作(我是 c++ 的新手,只使用单线程的承诺(?
我不认为std::promise<void>
可以在这里使用,因为它更像是一次性的东西。结果在未来设定后,它不能取消设置,因此我们不能为同一个承诺等待两次。
可以执行以下操作(计数器必须使线程安全,但我现在懒得这样做(:
int finished_threads = 0;
auto handle_work = [&finished_threads](){
bool this_finished = false;
while (finished_threads < NUM_OF_THREADS) {
while (!work_queue.empty) {
if(this_finished) {
this_finished = false;
--finished_threads; // evil
}
T task = work_queue.pop();
handle_task(task);
}
if(!this_finished) {
this_finished = true;
++finished_threads; // evil
}
}
};
这应该仅在所有线程完成后退出(如:它们不再处理任务并且队列中没有(。然后,队列中将不再放置任何新任务。局部变量最大限度地减少了对共享内存的访问。
请注意,我在多线程编程方面的经验非常有限。
我用std::condition_variable
尝试了一些完全不同的东西:
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <optional>
#include <algorithm>
constexpr int NUM_OF_THREADS = 5;
std::condition_variable input_cv;
std::condition_variable callback_cv;
std::optional<int> data {};
std::mutex m;
std::mutex callback_mutex;
int finished_threads = 0;
static void handle_task(int i)
{
}
struct worker
{
const int num;
worker () = delete;
void operator()() {
while(true){
std::unique_lock<std::mutex> lk(m);
++finished_threads; // protected by m
callback_cv.notify_one(); // wake up main thread if it sleeps
input_cv.wait(lk);
if(!data)
return;
--finished_threads; // protected by m
int local_data = *data;
lk.unlock();
handle_task(local_data);
}
}
};
struct safe_queue // dummy
{
int pop () const { return 0; }
bool empty() const {return true;}
};
void main_thread ()
{
std::vector<std::thread> workers;
safe_queue work_queue;
for(int i = 0; i < NUM_OF_THREADS; ++i)
{
workers.emplace_back(worker{i});
}
do
{
do
{
{
std::lock_guard<std::mutex> guard(m);
data.emplace(work_queue.pop());
}
input_cv.notify_one();
} while(!work_queue.empty() && finished_threads > 0)
// If no thread has finished, we can wait for the next one to finish.
std::unique_lock<std::mutex> lk(callback_mutex);
callback_cv.wait(lk); // We wait on some thread to have finished
}while(finished_threads < NUM_OF_THREADS && !work_queue.empty()); // In either case, there remains something to do.
data = {};
input_cv.notify_all();
std::for_each(begin(workers), end(workers), [](std::thread &t) { t.join();});
}
int main()
{
std::thread t(main_thread);
t.join();
}
不确定它是否更好,但它肯定更复杂^^。
相关文章:
- 从不同线程使用int64的不同字节安全吗
- 删除一个线程上有数百万个字符串的大型哈希映射会影响另一个线程的性能
- 在C++中使用cURL和多线程
- 为什么我的C#代码在调用回C++COM直到Task时会暂停.等待/线程.加入
- 在cuda线程之间共享大量常量数据
- 如何将元素添加到数组的线程安全函数?
- 线程,如果else语句,都是错误的上下文切换后,会发生什么
- C++Boost Asio Pool线程,带有lambda函数和传递引用变量
- Qt C++静态thread_local QNetworkAccessManager是线程应用程序的好选择吗
- 异常属于C++中的线程还是进程
- C++中的线程安全删除
- C++使用params创建线程函数会导致转换错误
- 类与私有变量的其他类之间的线程安全性
- CoInitialize()在单独的线程上崩溃而不返回
- c++中的线程池
- 线程之间的布尔停止信号
- 为什么std::async使用同一个线程运行函数
- c++ 线程和承诺
- 存储/传递 v8 承诺解析器供以后使用的最佳实践?(结合C++线程)
- 如何通过std ::承诺进入线程?由STD :: Move或STD :: Shared_ptr