C/C++pthread信号和指针

C/C++ pthread signals and pointers

本文关键字:指针 信号 C++pthread      更新时间:2023-10-16

我最难理解如何允许线程相互发送信号。

我的设计:

main函数创建一个主线程,用于协调一组其他工作线程。main函数还创建了worker,因为worker线程以main中编程的间隔生成和退出。主线程需要能够向这些工作线程发送信号并对它们进行信号广播,工作线程也必须向主线程发送信号(pthread_cond_signal)。由于每个线程都需要pthread_mutex和pthread_cond,所以我用这些变量创建了一个Worker类和一个Master类。现在这就是我的困境。C++不允许将成员函数作为pthread_create(…)处理程序传递,所以我不得不在内部制作一个静态处理程序,并将指针传递给它自己,以重新解释它以使用它的类数据。。。

void Worker::start() {
pthread_create(&thread, NULL, &Worker::run, this);
}
void* Worker::run(void *ptr) {
Worker* data = reinterpret_cast<Worker*>(ptr);
}

我在这个可能是错误的设置中遇到的问题是,当我将一组工作指针传递给主线程时,它会发出不同的工作指针引用的信号,因为我认为强制转换进行了某种复制。所以我尝试了static_cast和同样的行为。

我只需要某种设计,Master和Worker可以相互pthread_cond_wait(…)和pthread-cond_signal(…)。

编辑1

添加:

private:
    Worker(const Worker&);

仍然不起作用。

编辑修复了所有版本中的潜在竞争:

1./1b使用由C++0x中概述的(互斥+条件+计数器)构建的信号量是否没有信号量?如何同步线程
2.使用"反向"等待,以确保信号得到ack-预期工作人员

我真的建议使用c++11风格的<thread><condition_variable>来实现这一点。

我有两个(半)降级。他们每个人都假设你有一个能驱动10名工人的师傅。每个工人在工作前都在等待一个信号。

我们将使用std::condition_variable(它与std::mutex一起工作)来进行信令。第一个版本和第二个版本之间的区别在于信令的方式:

  • 1.一次通知一名工人:
  • 1b。具有辅助结构
  • 2.通知所有线程,协调哪个接收方工作人员要响应

1.一次通知一名工人:

这是最简单的方法,因为几乎没有协调:

#include <vector>
#include <thread>
#include <mutex>
#include <algorithm>
#include <iostream>
#include <condition_variable>
using namespace std;
class semaphore 
{ // see https://stackoverflow.com/questions/4792449/c0x-has-no-semaphores-how-to-synchronize-threads
    std::mutex mx;
    std::condition_variable cv;
    unsigned long count;
public:
    semaphore() : count() {} 
    void notify();
    void wait();
};
static void run(int id, struct master& m);
struct master
{
    mutable semaphore sem;
    master()
    {
        for (int i = 0; i<10; ++i)
            threads.emplace_back(run, i, ref(*this));
    }
    ~master() {
        for(auto& th : threads) if (th.joinable()) th.join(); 
        std::cout << "donen";
    }
    void drive()
    {
        // do wakeups
        for (unsigned i = 0; i<threads.size(); ++i)
        {
            this_thread::sleep_for(chrono::milliseconds(rand()%100));
            sem.notify();
        }
    }
  private:
    vector<thread> threads;
};
static void run(int id, master& m)
{
    m.sem.wait();
    {
        static mutex io_mx;
        lock_guard<mutex> lk(io_mx);
        cout << "signaled: " << id << "n";
    }
}
int main()
{
    master instance;
    instance.drive();
}
/// semaphore members
void semaphore::notify()
{
    lock_guard<mutex> lk(mx);
    ++count;
    cv.notify_one();
}
void semaphore::wait()
{
    unique_lock<mutex> lk(mx);
    while(!count)
        cv.wait(lk);
    --count;
}

1b。具有辅助结构

注意,如果您有worker类,而worker::run是一个非静态成员函数,那么只需进行一些小的修改:

struct worker
{
    worker(int id) : id(id) {}
    void run(master& m) const;
    int id;
};
// ...
struct master
{
    // ...
    master()
    {
        for (int i = 0; i<10; ++i)
            workers.emplace_back(i);
        for (auto& w: workers)
            threads.emplace_back(&worker::run, ref(w), ref(*this));
    }
// ...
void worker::run(master& m) const
{
    m.sem.wait();
    {
        static mutex io_mx;
        lock_guard<mutex> lk(io_mx);
        cout << "signaled: " << id << "n";
    }
}

警告

  • cv.wait()可能遭受虚假唤醒,其中条件变量没有被自动提升(例如,在操作系统信号处理程序的情况下)。在任何平台上,条件变量都会发生这种情况

以下方法修复了此问题:

2.通知所有线程,协调哪个接收方工作人员

使用标志来通知哪个线程打算接收信号:

struct master
{
    mutable mutex mx;
    mutable condition_variable cv;
    int signaled_id;               // ADDED
    master() : signaled_id(-1)
    {

让我们假设driver变得更有趣,并希望以特定(随机…)顺序向所有工人发出信号:

    void drive()
    {
        // generate random wakeup order
        vector<int> wakeups(10);
        iota(begin(wakeups), end(wakeups), 0);
        random_shuffle(begin(wakeups), end(wakeups));
        // do wakeups
        for (int id : wakeups)
        {
            this_thread::sleep_for(chrono::milliseconds(rand()%1000));
            signal(id);
        }
    }
  private:
    void signal(int id)                // ADDED id
    {
        unique_lock<mutex> lk(mx);
        std::cout << "signaling " << id << "n";
        signaled_id = id;              // ADDED put it in the shared field
        cv.notify_all();
        cv.wait(lk, [&] { return signaled_id == -1; });
    }

现在我们所要做的就是确保接收线程检查它的id是否匹配:

m.cv.wait(lk, [&] { return m.signaled_id == id; });
m.signaled_id = -1;
m.cv.notify_all();

这结束了虚假的觉醒。

完整的代码列表/实时演示:

  • 1.notify_one.cpphttp://coliru.stacked-crooked.com/view?id=c968f8cffd57afc2a0c6777105203f85-03e740563a9d9c6bf97614ba6099fe92
  • 1bid。带有worker结构:http://coliru.stacked-crooked.com/view?id=7bd224c42130a0461b0c894e0b7c74ae-03e740563a9d9c6bf97614ba6099fe92
  • 2.notify_all.cpphttp://coliru.stacked-crooked.com/view?id=1d3145ccbb93c1bec03b232d372277b8-03e740563a9d9c6bf97614ba6099fe92

目前尚不清楚您的确切情况,但您似乎正在使用一个容器来容纳在main中创建的"Worker"实例,并将它们传递给您的"Master"。如果是这样的话,你可以采取一些补救措施。您需要选择一个适合您的实现的。

  • 将对main中容器的引用传递给Master
  • 将容器更改为容纳指向Workers的(智能)指针
  • 使容器成为"Master"本身的一部分,这样它就不需要传递给它
  • 为Worker类实现一个适当的析构函数、复制构造函数和赋值运算符(换句话说,遵守三规则)

从技术上讲,由于pthread_create()是一个C API,因此传递给它的函数指针需要具有C链接(extern "C")。你不能让C++类的方法有C链接,所以你应该定义一个外部函数:

extern "C" { static void * worker_run (void *arg); }
class Worker { //...
};
static void * worker_run (void *arg) {
    return Worker::run(arg);
}