多线程程序生产者/消费者[BOOST]

multithreaded program producer/consumer [boost]

本文关键字:BOOST 消费者 程序 生产者 多线程      更新时间:2023-10-16

我正在使用Boost Library和C 玩。我想创建一个包含生产者,Conumer和堆栈的多线程程序。procuder填充堆栈,消费者从堆栈中删除项目(INT)。一切都起作用(pop,push,sutex),但是当我将pop/pusth winthin称为线程时,我不会得到任何效果

我制作了这个简单的代码:

#include "stdafx.h"
#include <stack>
#include <iostream>
#include <algorithm>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/date_time.hpp> 
#include <boost/signals2/mutex.hpp>
#include <ctime>
using namespace std;
/ *
* this class reprents a stack which is proteced by mutex
* Pop and push are executed by one thread each time.
*/
class ProtectedStack{
private : 
stack<int> m_Stack;
boost::signals2::mutex m;
public : 
ProtectedStack(){
}
ProtectedStack(const ProtectedStack & p){
}
void push(int x){
    m.lock();
    m_Stack.push(x);
    m.unlock();
}
void pop(){
    m.lock();
    //return m_Stack.top();
    if(!m_Stack.empty())
        m_Stack.pop();
    m.unlock(); 
}
int size(){
    return m_Stack.size();
}
bool isEmpty(){
    return m_Stack.empty();
}
int top(){
    return m_Stack.top();
}
};
/*
*The producer is the class that fills the stack. It encapsulate the thread object 
*/
class Producer{
public:
Producer(int number ){
    //create thread here but don't start here
m_Number=number;

}
void fillStack (ProtectedStack& s ) {
    int object = 3; //random value
    s.push(object);
    //cout<<"push objectn";
}
void produce (ProtectedStack & s){
    //call fill within a thread 
    m_Thread = boost::thread(&Producer::fillStack,this, s);  
}
 private :
int m_Number;
boost::thread m_Thread;
};

/* The consumer will consume the products produced by the producer */ 
class Consumer {
private : 
int m_Number;
boost::thread m_Thread;
public:
Consumer(int n){
    m_Number = n;
}
void remove(ProtectedStack &s ) {
     if(s.isEmpty()){ // if the stack is empty sleep and wait for the producer      to fill the stack
        //cout<<"stack is emptyn";
        boost::posix_time::seconds workTime(1); 
        boost::this_thread::sleep(workTime);
     }
     else{
        s.pop(); //pop it
        //cout<<"pop objectn";
     }
}
void consume (ProtectedStack & s){
    //call remove within a thread 
    m_Thread = boost::thread(&Consumer::remove, this, s);  
}
};

int main(int argc, char* argv[])  
{  

ProtectedStack s;

    Producer p(0);
    p.produce(s);
    Producer p2(1);
    p2.produce(s);
    cout<<"size after production "<<s.size()<<endl;
    Consumer c(0);
    c.consume(s);
    Consumer c2(1);
    c2.consume(s);
    cout<<"size after consumption  "<<s.size()<<endl;
getchar();
return 0;  
}  

我在VC 2010/Win7中运行我有 :00

您能帮助我理解为什么当我从主呼叫FillStack功能时,我会产生效果,但是当我从线程打电话时,什么都没有发生?谢谢

您的示例代码遭受了其他人指出的夫妇同步问题:

  • 丢失了呼叫的锁定锁。
  • 主线程可以退出而无需允许工作线程加入。
  • 生产者和消费者不会像您期望的那样循环。生产者应始终(尽可能)生产,并且随着新元素被推到堆栈时,消费者应继续消费。
  • 在主线程上的库特很可能会在生产者或消费者有机会工作之前进行。

我建议您使用条件变量在生产者和消费者之间同步。在此处查看生产者/消费者示例:http://en.cppreference.com/w/cpp/thread/condition_variable截至C 11,它是标准库中的一个相当新的功能,并从VS2012开始支持。在VS2012之前,您要么需要提升或使用Win32呼叫。

使用条件变量解决生产者/消费者问题很不错,因为它几乎可以强制使用MUTEX锁定共享数据,并且提供了一种信号机制,让消费者知道某些东西已准备就绪,因此可以消耗它们有如此旋转(这始终是消费者的响应能力和CPU使用的响应能力的经济性,将队列进行了调查)。它也是原子本身,它可以防止线程缺少信号的可能性,该信号表明有东西可以消费:

简要介绍了条件变量如何解决这个问题...

  • 生产者在没有静音的情况下在其线程上进行所有耗时的活动。
  • 生产者锁定了静音,将其生产的项目添加到全球数据结构(可能是某种队列)中,让我们使用静音并发出单个消费者的信号 - 以该顺序进行。
  • 正在等待条件变量的消费者自动重新呼吸,将项目从队列中删除并在其上进行一些处理。在此期间,生产商已经在努力生产新项目,但是必须等到消费者才能排队之前等待。

这将对您的代码产生以下影响:

  • 不再需要保护堆栈,正常的堆栈/队列数据结构将做到。
  • 如果您正在使用足够的编译器,则不需要提升 - 删除构建依赖总是一件好事。

我感觉到线程对您来说是相当新的,因此我只能提供建议,以查看其他人如何解决同步问题,因为很难将您的思想包裹起来。关于具有多个线程和共享数据的环境中发生的事情的混乱通常会导致诸如《僵局》之类的问题。

代码的主要问题是您的线程没有同步。请记住,默认情况下执行未订购且未进行测序,因此消费者线程实际上可以(在您的特定情况下)完成 任何生产者线程都会生成任何数据。

为了确保在生产商完成工作后需要运行消费者,您需要在生产者线程上使用thread::join()功能,它将停止主线程执行直到生产者退出:

// Start producers
...
p.m_Thread.join();  // Wait p to complete
p2.m_Thread.join(); // Wait p2 to complete
// Start consumers
...

这将解决问题,但这可能不适合典型的生产商 - 消费者用例。

要实现更有用的案例,您需要修复消费者功能。您的消费者功能实际上不等待生成的数据,如果堆栈为空,它将退出,如果尚未生成数据,则永远不会消耗任何数据。

应该像这样:

void remove(ProtectedStack &s)
{
   // Place your actual exit condition here,
   // e.g. count of consumed elements or some event
   // raised by producers meaning no more data available etc.
   // For testing/educational purpose it can be just while(true)
   while(!_some_exit_condition_)
   {
      if(s.isEmpty())
      {
          // Second sleeping is too big, use milliseconds instead
          boost::posix_time::milliseconds workTime(1); 
          boost::this_thread::sleep(workTime);               
      }               
      else
      {
         s.pop();
      }
   }
} 

另一个问题是错误的 thread构造函数:

m_Thread = boost::thread(&Producer::fillStack, this, s);  

引用来自boost.thread文档:

带有参数的线程构造函数

template <class F,class A1,class A2,...> thread(F f,A1 a1,A2 a2,...);

先决条件: F和每个An必须通过可复制或可移动。

效果: 好像thread(boost::bind(f,a1,a2,...))。因此, f,每个a都复制到 内部存储以访问新线程

这意味着您的每个线程都会收到自己的s副本,并且所有修改并非应用于s,而是将其应用于本地线程副本。当您将对象传递到按值函数参数时,这是同一情况。您需要通过参考来传递s对象 - 使用boost::ref

void produce(ProtectedStack& s)
{
   m_Thread = boost::thread(&Producer::fillStack, this, boost::ref(s));
}
void consume(ProtectedStack& s)
{
   m_Thread = boost::thread(&Consumer::remove, this, boost::ref(s));
}  

另一个问题是关于您的静音使用情况。这不是最好的。

  1. 为什么您使用Signals2库中的Mutex?只需在boost.thread中使用 boost::mutex,然后删除dooded依赖关系到信号库。

  2. 使用RAII包装器boost::lock_guard代替直接lock/unlock调用。

  3. 正如其他人提到的,您应使用ProtectedStack的所有成员进行保护。

样本:

boost::mutex m;
void push(int x)
{ 
   boost::lock_guard<boost::mutex> lock(m);
   m_Stack.push(x);
} 
void pop()
{
   boost::lock_guard<boost::mutex> lock(m);
   if(!m_Stack.empty()) m_Stack.pop();
}              
int size()
{
   boost::lock_guard<boost::mutex> lock(m);
   return m_Stack.size();
}
bool isEmpty()
{
   boost::lock_guard<boost::mutex> lock(m);
   return m_Stack.empty();
}
int top()
{
   boost::lock_guard<boost::mutex> lock(m);
   return m_Stack.top();
}

您没有在尝试消耗之前检查生产线程是否已执行。您也不会锁定尺寸/空/顶部...如果容器被更新,这是不安全的。