特定的生产者-消费者方案

Particular producer-consumer scenario

本文关键字:消费者 方案 生产者      更新时间:2023-10-16

我需要实现一个特定的生产者-消费者场景,其中类 Consumer 创建两个线程处理两个端口,每个线程将一个值存储到相应的端口中,如果另一个端口上有可用的值,则调用process来使用两个端口上的值。这是我的尝试:

#include <thread>
#include <atomic>
#include <condition_variable>
#include <vector>
#include <iostream>
#include <mutex>
struct Port {
int value;
std::atomic<bool> free;
};
class Consumer {
private:
Port port1;
Port port2;
std::mutex mutex;
std::condition_variable port1_ready;
std::condition_variable port2_ready;
std::vector<std::thread> workers;
std::atomic<bool> done;
int count;
public:
Consumer() : done(false), count(0) {
port1.free = true;
port1.value = 0;
port2.free = true;
port1.value = 0;
workers.push_back(std::thread([this]{
while (!done) {
write1(rand());
}
}));
workers.push_back(std::thread([this]{
while (!done) {
write2(rand());
}
}));
}
void write1(int value) {
std::unique_lock lock(mutex);
port1.value = value;
port1.free = false;
std::cout << "port1 stored " << value << std::endl;
port1_ready.notify_one();
if (port2.free) {
port2_ready.wait(lock);
}
process("port1");
}
void write2(int value) {
std::unique_lock lock(mutex);
port2.value = value;
port2.free = false;
std::cout << "port2 stored " << value << std::endl;
port2_ready.notify_one();
if (port1.free) {
port1_ready.wait(lock);
}
process("port2");
}
void process(std::string string) {
port1.free = true;
port2.free = true;
std::cout << string << " consumed " << port1.value << " " << port2.value << std::endl;
if (count++ == 20) done = true;
}
~Consumer() {
for (auto& w: workers) {
w.join();
}
}
};
int main(int argc, char** argv) {
Consumer c{};
return 0;
}

这是输出:

port1 stored 41
port2 stored 41
port2 consumed 41 41
port2 stored 18467
port1 consumed 41 18467
port1 stored 18467
port2 consumed 18467 18467
port2 stored 6334
port1 consumed 18467 6334
port1 stored 6334
port2 consumed 6334 6334
port2 stored 26500
port1 consumed 6334 26500
port1 stored 26500
port2 consumed 26500 26500
port2 stored 19169
port1 consumed 26500 19169
port1 stored 19169
port2 consumed 19169 19169
port2 stored 15724
port1 consumed 19169 15724
port1 stored 15724
port2 consumed 15724 15724
port2 stored 11478
port1 consumed 15724 11478
port1 stored 11478
port2 consumed 11478 11478
port2 stored 29358
port1 consumed 11478 29358
port1 stored 29358
port2 consumed 29358 29358
port2 stored 26962
port1 consumed 29358 26962
port1 stored 26962
port2 consumed 26962 26962
port2 stored 24464
port1 consumed 26962 24464
port1 stored 24464
port2 consumed 24464 24464
port2 stored 5705
port1 consumed 24464 5705
port1 stored 5705
port2 consumed 5705 5705

有时它会成功返回,其他人会陷入僵局。

通过注意以下几点,可以轻松观察到您的逻辑错误:

void process(std::string string) {
port1.free = true;
port2.free = true;

这清楚地表明,您的意图是考虑现在放置在两个"端口"中的值进行"处理"。也就是说,一旦将值放入两个"端口"中,两个值都会被"处理",并且两个端口都会再次"空闲"。

但是,请注意日志的开头:

port1 stored 41
port2 stored 41
port2 consumed 41 41

到目前为止一切顺利,41 个被放置在两个港口,port2过程最终"处理"了它们。但紧接着:

port2 stored 18467
port1 consumed 41 18467

在这一点上,事情几乎已经偏离了轨道。 41 已经"处理"了,显然不应该再"处理"了。

write1()的内容打印在两张纸上write2()。用左手的食指描摹write1()线的执行,用右手的食指描摹write2()线的执行。

从右手开始,在write2()锁定互斥锁时跟踪它,并执行其业务,并发现

if (port2.free) {

是真的,那么

port2_ready.wait(lock);

等待此条件变量。这将解锁互斥锁,您的左手食指现在可以向前移动。您的左手食指现在向前移动,直到:

port2_ready.notify_one();

这会通知另一个线程,该线程必须等到互斥锁解锁,因此它会等待右手食指继续移动:

if (port1.free) {

这是真的,所以现在第一个线程可以开始移动了。如果你跟随,你可以看到两个线程最终是如何进入process()的,而不仅仅是其中一个。失败。

这种逻辑从根本上被打破了。有几种方法可以正确执行此操作,但最简单的方法如下。当任一线程获取互斥锁时,它

  • 检查线程拥有的端口中是否已具有值(从上次调用该端口开始(。

  • 如果端口已有值,请等待条件变量,直到端口空闲(依靠另一个线程清除它(。

  • 如果端口可用,请将其值保存在线程的端口
  • 中,然后检查其他线程的端口是否已具有值。如果没有,线程可以返回并继续其业务获取下一个值,并确保另一个线程将负责处理两个保存的值。

  • 否则,两个端口都有值,调用process()来处理它们,清除两个端口,并向另一个线程的条件变量发出信号,让它知道如果它正在等待其端口空闲,它现在可以自由保存另一个线程的下一个值。

>即使port1.freeport2.free之一为真,您也会称呼process

您可以将代码更改为以下内容:

struct Port {
std::optional<int> value;
};
class Consumer {
private:
Port port1;
Port port2;
std::mutex mutex;
std::condition_variable port1_ready;
std::condition_variable port2_ready;
std::vector<std::thread> workers;
std::atomic<bool> done;
int count;
public:
Consumer() : done(false), count(0) {
std::random_device rd;
workers.push_back(std::thread([this, gen = std::mt19937{rd()}] () mutable {
while (!done) {
write1(gen());
}
}));
workers.push_back(std::thread([this, gen = std::mt19937{rd()}] () mutable {
while (!done) {
write2(gen());
}
}));
port1_ready.notify_one();
port2_ready.notify_one();
}
void write1(int value) {
std::unique_lock lock(mutex);
port1_ready.wait(lock, [&](){ return !port1.value; });
port1.value = value;
std::cout << "port1 stored " << value << std::endl;
if (port2.value) {
process("port1");
}
}
void write2(int value) {
std::unique_lock lock(mutex);
port2_ready.wait(lock, [&](){ return !port2.value; });
port2.value = value;
std::cout << "port2 stored " << value << std::endl;
if (port1.value) {
process("port2");
}
}
void process(std::string string) {
std::cout << string << " consumed " << *port1.value << " " << *port2.value << std::endl;
port1.value.reset();
port2.value.reset();
port1_ready.notify_one();
port2_ready.notify_one();
if (count++ == 20) done = true;
}
~Consumer() {
for (auto& w: workers) {
w.join();
}
}
};
int main() {
Consumer c{};
}

演示