使用 C++ packaged_task构建生产者-消费者模式
using c++ packaged_task to construct producer-consumer pattern
我正在尝试packaged_task创建一个生产者-消费者模式代码如下: test_thread9_producer1
并将test_thread9_producer2
任务推送到队列中并test_thread9_consumer1
从队列中检索要执行的任务
但是,当运行test_thread9
时,它正确执行任务,但完成调试错误:已调用中止。我不知道为什么?谁能帮我更了解packaged_task
?
第二个问题:消费者运行while(1)
循环,我想不出优雅的方式当两个生成者完成将所有任务推送到队列中并且test_thread9_consumer1
完成队列中所有任务的执行时,test_thread9_consumer1
退出。 谁能给我一些建议?
void test_thread9()
{
std::thread t1(test_thread9_producer1);
std::thread t2(test_thread9_producer2);
std::thread t3(test_thread9_consumer1);
t1.join();
t2.join();
t3.join();
}
std::deque<std::packaged_task<int()>>task_q;
std::mutex lock9;
int factial_calc2(int in)
{
int ret = 1;
for (int i = in; i > 1; i--)
{
ret = ret*i;
}
std::lock_guard<std::mutex> locker(lock9);
std::cout << "input is " << in << "result is " << ret << std::endl;
return ret;
}
void test_thread9_producer1()
{
for (int i = 0; i < 10; i = i + 2)
{
std::lock_guard<std::mutex> locker(lock9);
std::packaged_task<int()> t1(std::bind(factial_calc2, i));
task_q.push_back(std::move(t1));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void test_thread9_producer2()
{
for (int i = 1; i < 10; i = i + 2)
{
std::lock_guard<std::mutex> locker(lock9);
std::packaged_task<int()> t1(std::bind(factial_calc2, i));
task_q.push_back(std::move(t1));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void test_thread9_consumer1()
{
std::packaged_task<int()>t;
while (1)
{
{
std::lock_guard<std::mutex> locker(lock9);
if (!task_q.empty())
{
t = std::move(task_q.front());
task_q.pop_front();
}
}
t();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
为什么会崩溃?
如果您的使用者线程找到一个空队列,它仍然会尝试执行打包的任务,尽管它没有移动。这是UB,因此运行时错误!
您可以通过检查packaged_task是否有效来改进此问题:
while (1)
{
std::packaged_task<int()>t; // to make sure that valid() checks this iteration
{
...
}
if (t.valid())
t(); // execute only if it's a valid task
...
}
如何避免无休止的循环?
您必须以某种方式跟踪正在运行的内容。 一种简单的技术是使用 atomic
变量来管理共享状态信息(可以在不锁定的情况下并发访问)。
例如,您可以计算成品生产商的敏捷性
std::atomic<int>finished{0}; // count the producers that are finished
...
无效 test_thread9_producerN() { Cout <<"开始制作人"<<p>然后,您可以调整您的消费者以考虑以下信息:
void test_thread9_consumer1()
{
bool nothing_to_do{false};
while (!nothing_to_do && finished<2)
{
...
nothing_to_do=task_q.empty(); // in the lock protected section
if (!nothing_to_do)
...
}
}
在线演示
void test_thread9_consumer1()
{
std::packaged_task<int()>t;
while (1)
{
{
std::lock_guard<std::mutex> locker(lock9);
if (!task_q.empty())
{
t = std::move(task_q.front());
task_q.pop_front();
}
}
t();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
让我们修剪一下这段代码:
std::packaged_task<int()>t;
while (1)
{
if (!task_q.empty())
t = std::move(task_q.front());
t();
}
现在我们可以清楚地看到错误:无论我们是否获取它,您都尝试调用 t()。
void test_thread9_consumer1()
{
std::packaged_task<int()>t;
while (1)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
{
std::lock_guard<std::mutex> locker(lock9);
if (task_q.empty())
continue;
t = std::move(task_q.front());
task_q.pop_front();
}
t();
}
}
您可能希望通过查看"条件变量"来继续探索线程
对于问题的第二部分,您可以考虑更改主线程以join
所有提供程序,然后设置一个全局标志,指示工作已完成。
标准::原子g_producing;
void test_thread9()
{
std::thread t1(test_thread9_producer1);
std::thread t2(test_thread9_producer2);
g_producing = true;
std::thread t3(test_thread9_consumer1);
t1.join();
t2.join();
g_producing = false;
t3.join();
}
std::deque<std::packaged_task<int()>>task_q;
std::mutex lock9;
int factial_calc2(int in)
{
int ret = 1;
for (int i = in; i > 1; i--)
{
ret = ret*i;
}
std::lock_guard<std::mutex> locker(lock9);
std::cout << "input is " << in << "result is " << ret << std::endl;
return ret;
}
void test_thread9_producer1()
{
for (int i = 0; i < 10; i = i + 2)
{
std::lock_guard<std::mutex> locker(lock9);
std::packaged_task<int()> t1(std::bind(factial_calc2, i));
task_q.push_back(std::move(t1));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void test_thread9_producer2()
{
for (int i = 1; i < 10; i = i + 2)
{
std::lock_guard<std::mutex> locker(lock9);
std::packaged_task<int()> t1(std::bind(factial_calc2, i));
task_q.push_back(std::move(t1));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void test_thread9_consumer1()
{
std::packaged_task<int()>t;
for (;;)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
{
std::lock_guard<std::mutex> locker(lock9);
if (task_q.empty()) {
if (!g_producing)
break;
continue;
}
t = std::move(task_q.front());
task_q.pop_front();
}
t();
}
}
- C++多线程生产者-消费者问题
- 特定的生产者-消费者方案
- 为什么condition_variable在等待生产者-消费者的锁定?C++
- 将生产者/消费者与障碍同步
- 带有共享缓冲区的两个等待线程(生产者/消费者)
- 生产者/消费者,消费者线程从未执行过
- 在使用 pthread 和信号量实现生产者-消费者问题时需要帮助
- 如何在 c++ 中使用 winapi 事件解决生产者-消费者
- C++生产者消费者陷入僵局
- 生产者消费者实现中的条件变量应如何初始化
- 互斥体在生产者-消费者问题中阻塞线程
- 为什么这不是正确的生产者消费者模型以及当我使用 stl 队列时导致错误的原因
- 生产者消费者使用PTHreads和Semaphore的同步错误
- C 生产者消费者具有例外处理以防止死锁
- 生产者 - 消费者生产商创建2个元素POSIX信号量
- 这是实施过程间生产者消费者对流程崩溃的正确和安全的实施
- 如何使用QThreads使无锁生产者-消费者线程交换更加异常安全
- 多生产者/消费者绩效
- 带有生产者/消费者文件副本的segfault
- 多线程程序生产者/消费者[BOOST]