thread程序会导致很多竞争条件

boost::threads program causes lots of race conditions

本文关键字:竞争 条件 程序 thread      更新时间:2023-10-16

我有一个程序,我使用boost::threads多线程。不幸的是,drd (valgrind --tool=drd ./my_program)报告了许多关于10000的问题。

我不确定我是否误解了boost线程的一些东西。我试着找出我的错误了几个小时,但没有得到更多,因此任何帮助都将是感激的。

我尝试管道某些过滤器,并希望能够运行它们通过调用最后一个过滤器运行。然后,这个过滤器应该首先调用他所依赖的所有前体过滤器,最后调用他的processQueue()方法。我现在想要能够在它们的线程中调用前驱过滤器,这样如果图是并行路径,我就能得到一个加速。因此,我添加了threadgroup,以便每个前驱过滤器在自己的线程中执行。但不幸的是,我得到了很多竞争条件,我不确定他们的结果。我希望现在我想要达到的目标更明确了。

我已经将代码更新为一个更简单的代码,问题仍然发生。我认为这个问题与线程生成有关。

更新2

我认为这些的主要原因是valgrind的假阳性率非常高。我对此提出了一个新问题。看到

更新3

当我使用valgrind 3.6.1而不是3.7.0或3.8.0时,大多数错误都可以避免。

这里有一份drd报告:

==29905== Conflicting load by thread 1 at 0xb0081000 size 8
==29905==    at 0x25A6C2: pthread_join (in /usr/lib/system/libsystem_c.dylib)
==29905==    by 0x2BEC0: boost::thread::join() (in /usr/local/lib/libboost_thread.dylib)
==29905==    by 0x100006641: Filter::run() (in ./playgroudThreads)
==29905==    by 0x100001013: main (in ./playgroudThreads)
==29905== Allocation context: unknown.
==29905== Other segment start (thread 2)
==29905==    at 0x2A7B68: thread_start (in /usr/lib/system/libsystem_c.dylib)
==29905== Other segment end (thread 2)
==29905==    at 0x3E667A: mach_msg_trap (in /usr/lib/system/libsystem_kernel.dylib)
==29905==    by 0x3DED38: semaphore_create (in /usr/lib/system/libsystem_kernel.dylib)
==29905==    by 0x2A50F7: new_sem_from_pool (in /usr/lib/system/libsystem_c.dylib)
==29905==    by 0x2A6199: _pthread_exit (in /usr/lib/system/libsystem_c.dylib)
==29905==    by 0x2A48C9: _pthread_start (in /usr/lib/system/libsystem_c.dylib)
==29905==    by 0x2A7B74: thread_start (in /usr/lib/system/libsystem_c.dylib)

这里是我的示例代码:

#include <iostream>
#include <vector>
#include <sys/time.h>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
class Filter
{
    public:
        Filter(int n) :
                n_(n), precursor_(0)
        {
        }
        ~Filter()
        {
        }
        void connect(Filter& f)
        {
            precursor_ = &f;
        }
        void run()
        {
            if (!isCalculationDone_) {
                if (precursor_) {
                    boost::thread thread(&Filter::run, precursor_);
                    thread.join();
                }
                this->processQueue(2);
                isCalculationDone_ = true;
            }
        }
        void processQueue(unsigned N)
        {
            //do some calculations
        }
    public:
        int n_;
        Filter* precursor_;
        bool isCalculationDone_;
};
int main(int argc, char* argv[])
{
    Filter* f1 = new Filter(1);
    Filter* f2 = new Filter(2);
    f2->connect(*f1);
    f2->run();
    std::cerr << "main: done" << std::endl;
    delete f2;
    delete f1;
    return 0;
}
;

您正在创建8个过滤器。每个Filter对象都有自己的filterMutex_——它们之间没有任何关系。

您正在创建超过8个线程。这是故意的吗?

每次调用run将为每个前驱启动一个新线程,在该线程上为该前驱过滤器对象调用Filter::run。所以:

f8->run creates 2 threads for its precursors, calling f6->run and f7->run
 f6->run creates 2 threads: f4->run and f5->run
  f4->run creates 1 thread: f2->run
   f2->run creates 1 thread: f1->run
    f1->run creates no additional threads
  f5->run creates 1 thread: f3->run
   f3->run creates 1 thread: f1->run (different thread from the other f1->run)
    f1->run creates no additional threads
 f7->run creates 1 thread: f3->run
  f3->run creates 1 thread: f1->run
   f1->run creates no additional threads

因此,使用您的8个Filter对象,您创建了10个线程(除了主线程之外),两次调用f3->run,三次调用f1->run

对同一对象的run的多个调用将被序列化。不同的过滤器没有序列化

不确定这些是否导致了你的问题,但这是一种让我对设计产生疑问的东西,以及它应该做什么。

你并不孤单:看看这里的线程,这表明这个问题是一个假阳性"可能是由一个新创建的线程重新使用线程本地存储的内存引起的"。

嗯,我不确定你的程序实际上应该做什么,但通常线程只是有用的,如果你线程独立的操作,像一个数学公式,不需要任何输入从任何其他进程你想线程,因为在任何其他情况下,线程必须等待,直到其他进程可以给这些数据,因此你冒着浪费大量的CPU时间。但是,由于这种情况是不可避免的,线程的艺术在于以一种尽可能短且尽可能少的方式实现您的问题。

在实现线程的同时,也存在两个线程需要一个资源(如变量)的问题,一个线程可能会在另一个线程读取它的时候改变它,因此可能会提供不一致的数据(如果一个线程比另一个线程快,那么你的程序可能会运行完全不同)。这实际上被称为竞争条件,为了防止这种情况,有互斥锁来防止同时读和写,还有一些函数让一个线程等待另一个线程。

我的猜测是这两种情况之一发生在你的程序中,因此vallgrind告诉你这些问题,因此在你的位置上,我会仔细检查你的整个代码,并实际上重新考虑任何依赖关系,有或可能在任何新线程之间。并考虑主体部分:

f2->connect(f1);
f3->connect(f1);
f4->connect(f2);
f5->connect(f3);
f6->connect(f4);
f6->connect(f5);
f7->connect(f3);
f8->connect(f6);
f8->connect(f7);

boost::unique_lock<boost::shared_mutex> lock(filterMutex_);

我想可能是第一种情况。

这个链接可能有助于解释您的vallgrind输出。尤其是"8.2.9"。"调试OpenMP程序"部分可能会让您感兴趣,因为实际上给出了非常相似的输出作为示例。

这里有一个教程,它似乎实际上经历了所有这些场景(甚至更多),并很好地解释了如何使用boost-threading