使用 C++ packaged_task构建生产者-消费者模式

using c++ packaged_task to construct producer-consumer pattern

本文关键字:生产者 消费者 模式 构建 task C++ packaged 使用      更新时间:2023-10-16

我正在尝试packaged_task创建一个生产者-消费者模式代码如下: test_thread9_producer1并将test_thread9_producer2任务推送到队列中并test_thread9_consumer1从队列中检索要执行的任务

但是,当运行test_thread9时,它正确执行任务,但完成调试错误:已调用中止。我不知道为什么?谁能帮我更了解packaged_task

第二个问题:消费者运行while(1)循环,我想不出优雅的方式当两个生成者完成将所有任务推送到队列中并且test_thread9_consumer1完成队列中所有任务的执行时,test_thread9_consumer1退出。 谁能给我一些建议?

void test_thread9()
{
    std::thread t1(test_thread9_producer1);
    std::thread t2(test_thread9_producer2);
    std::thread t3(test_thread9_consumer1);
    t1.join();
    t2.join();
    t3.join();
} 
std::deque<std::packaged_task<int()>>task_q;
std::mutex lock9;
int factial_calc2(int in)
{
    int ret = 1;
    for (int i = in; i > 1; i--)
    {
        ret = ret*i;
    }
    std::lock_guard<std::mutex> locker(lock9);
    std::cout << "input is " << in << "result is " << ret << std::endl;
    return ret;
}
void test_thread9_producer1()
{
    for (int i = 0; i < 10; i = i + 2)
    {
        std::lock_guard<std::mutex> locker(lock9);
        std::packaged_task<int()> t1(std::bind(factial_calc2, i));
        task_q.push_back(std::move(t1));
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}
void test_thread9_producer2()
{
    for (int i = 1; i < 10; i = i + 2)
    {
        std::lock_guard<std::mutex> locker(lock9);
        std::packaged_task<int()> t1(std::bind(factial_calc2, i));
        task_q.push_back(std::move(t1));
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void test_thread9_consumer1()
{
    std::packaged_task<int()>t;
    while (1)
    {
        {
            std::lock_guard<std::mutex> locker(lock9);
            if (!task_q.empty())
            {
                t = std::move(task_q.front());
                task_q.pop_front();
            }
        }
        t();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

为什么会崩溃?

如果您的使用者线程找到一个空队列,它仍然会尝试执行打包的任务,尽管它没有移动。这是UB,因此运行时错误!

您可以通过检查packaged_task是否有效来改进此问题:

while (1)
{
    std::packaged_task<int()>t;  // to make sure that valid() checks this iteration
    {
       ...
    }
    if (t.valid())
        t();  // execute only if it's a valid task
    ...
}

如何避免无休止的循环?

您必须以某种方式跟踪正在运行的内容。 一种简单的技术是使用 atomic 变量来管理共享状态信息(可以在不锁定的情况下并发访问)。

例如,您可以计算成品生产商的敏捷性

std::atomic<int>finished{0};  // count the producers that are finished
...

无效 test_thread9_producerN() { Cout <<"开始制作人"<<p>然后,您可以调整您的消费者以考虑以下信息:

void test_thread9_consumer1()
{
    bool nothing_to_do{false}; 
    while (!nothing_to_do && finished<2)
    {
    ...   
        nothing_to_do=task_q.empty();  // in the lock protected section 
        if (!nothing_to_do)     
    ...
    }
}

在线演示

void test_thread9_consumer1()
{
    std::packaged_task<int()>t;
    while (1)
    {
        {
            std::lock_guard<std::mutex> locker(lock9);
            if (!task_q.empty())
            {
                t = std::move(task_q.front());
                task_q.pop_front();
            }
        }
        t();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

让我们修剪一下这段代码:

    std::packaged_task<int()>t;
    while (1)
    {
            if (!task_q.empty())
                t = std::move(task_q.front());
        t();
    }

现在我们可以清楚地看到错误:无论我们是否获取它,您都尝试调用 t()。

void test_thread9_consumer1()
{
    std::packaged_task<int()>t;
    while (1)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        {
            std::lock_guard<std::mutex> locker(lock9);
            if (task_q.empty())
                continue;
            t = std::move(task_q.front());
            task_q.pop_front();
        }
        t();
    }
}

您可能希望通过查看"条件变量"来继续探索线程

对于问题的第二部分,您可以考虑更改主线程以join所有提供程序,然后设置一个全局标志,指示工作已完成。

标准::原子g_producing;

void test_thread9()
{
    std::thread t1(test_thread9_producer1);
    std::thread t2(test_thread9_producer2);
    g_producing = true;
    std::thread t3(test_thread9_consumer1);
    t1.join();
    t2.join();
    g_producing = false;
    t3.join();
} 
std::deque<std::packaged_task<int()>>task_q;
std::mutex lock9;
int factial_calc2(int in)
{
    int ret = 1;
    for (int i = in; i > 1; i--)
    {
        ret = ret*i;
    }
    std::lock_guard<std::mutex> locker(lock9);
    std::cout << "input is " << in << "result is " << ret << std::endl;
    return ret;
}
void test_thread9_producer1()
{
    for (int i = 0; i < 10; i = i + 2)
    {
        std::lock_guard<std::mutex> locker(lock9);
        std::packaged_task<int()> t1(std::bind(factial_calc2, i));
        task_q.push_back(std::move(t1));
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}
void test_thread9_producer2()
{
    for (int i = 1; i < 10; i = i + 2)
    {
        std::lock_guard<std::mutex> locker(lock9);
        std::packaged_task<int()> t1(std::bind(factial_calc2, i));
        task_q.push_back(std::move(t1));
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}
void test_thread9_consumer1()
{
    std::packaged_task<int()>t;
    for (;;)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        {
            std::lock_guard<std::mutex> locker(lock9);
            if (task_q.empty()) {
                if (!g_producing)
                    break;
                continue;
            }
            t = std::move(task_q.front());
            task_q.pop_front();
        }
        t();
    }
}