pthread_mutex_wait多个生产者和消费者

pthread_mutex_wait multiple producer and consumer

本文关键字:生产者 消费者 mutex wait pthread      更新时间:2023-10-16

我正在经历这个和这个链接。基本上,他们正在讨论为什么经典的单线程生产者消费者设计(带有signalwait不适用于多生产者方案)。我有一个疑问一直困扰着我——

作者
的论点考虑参考代码

char queue[MAX];  //global
int head = 0, tail = 0; //global
struct cv not_full, not_empty;
struct lock qlock;
void produce(char data) {
  acquire(&qlock);
  if ((head + 1) % MAX  ==  tail) {
    wait(&not_full, &qlock);   //line 1
  }
  queue[head] = data; //line 2
  head = (head + 1) % MAX;
  notify(&not_full);
  release(&qlock);
}
char consume(void) {
  acquire(&qlock);
  if (tail == head) {
    wait(&not_empty, &qlock);
  }
  e = queue[tail];
  tail = (tail + 1) % MAX;
  notify(&not_empty);
  release(&qlock);
  return e;
}

在上面的代码中,在两个生产者的情况下,line 1将由单个消费者在两个生产者线程中woken up(当队列未满时)。因此,两个生产者都可以添加到导致队列溢出的队列中。

我的怀疑
一个。我们正在使用互斥锁保护队列。因此,即使在多个生产者线程上唤醒wait,只有一个生产者仍然具有互斥锁 - 因此从逻辑上讲,只有一个生产者"拥有"权限添加到队列中。因为当我们走出wait时,我们将有互斥锁被收购。

卡维特
我使用 POSIX 互斥体,cond var 作为参考。但是,我没有看到以POSIX作为参考标准撰写的文章。

问题
我对wait的理解特别pthread_cond_wait正确吗?对于多个生产者,代码的完整性仍然得到维护。

作者在

文章末尾的 wait() 和 notify() 语义下提到

通知 (CV) 唤醒当前正在等待该 CV 的所有线程

所以你对等待的理解是不正确的,notify注定要pthread_cond_broadcast在 posix 中。

此外,pthread_cond_signal文件规定

pthread_cond_signal()调用至少取消阻止其中一个线程 在指定的条件变量 cond 上被阻止(如果有的话) 线程在 cond 上被阻塞)。

这仍然与你"只有一个生产者"的假设不同。

如作者所示,上述代码的完整性不是由多个生产者维护的。

编辑:

条件变量的伪代码可能看起来像 wait

void wait (condition *cv, mutex *mx) 
{
    mutex_acquire(&c->listLock);  /* protect the queue */
    enqueue (&c->next, &c->prev, thr_self()); /* enqueue */
    mutex_release (&c->listLock); /* we're done with the list */
    /* The suspend and release_mutex() operation should be atomic */
    release_mutex (mx));
    thr_suspend (self);  /* Sleep 'til someone wakes us */
    <-------- notify executes somewhere else
    mutex_acquire (mx); /* Woke up -- our turn, get resource lock */
    return;
}

signal期间,队列中至少有一个处于唤醒状态suspend线程。但pthread并不能保证它只有一个。现在它们可以运行。但他们仍然需要获得锁以确保彼此之间的相互排斥。

因此,当第一个释放互斥锁时,第二个释放它,依此类推。

这意味着觉醒生产者将一个接一个地执行

queue[head] = data; //line 2
head = (head + 1) % MAX;
notify(&not_full);
release(&qlock);