如何使用 C++11 <thread> 设计从源中提取数据的系统

How to use C++11 <thread> designing a system which pulls data from sources

本文关键字:提取 数据 系统 C++11 何使用 lt thread gt      更新时间:2023-10-16

这个问题来自:C++11 线程不适用于虚拟成员函数

正如评论中所建议的,我在上一篇文章中的问题可能不是正确的问题,所以这里是原始问题:

我想制作一个捕获系统,它将以恒定/动态的频率查询几个源(因源而异,例如 10 次/秒(,并将数据拉到每个源的队列中。 虽然源不是固定的,但它们可能会在运行时添加/删除。

并且有一个监视器,它以恒定的频率从队列中提取并显示数据。

那么这个问题的最佳设计模式或结构是什么。

我正在尝试为所有源拉取器制作一个列表,每个拉取器都有一个线程和一个指定的拉取函数(不知何故,拉取函数可能会与拉取器交互,比如说如果源是消耗的,它会要求停止该线程上的拉取过程。

除非查询源的操作阻塞(或者有很多源(,否则不需要为此使用线程。我们可以从一个Producer开始,它将与同步或异步(线程(调度一起使用:

template <typename OutputType>
class Producer
{
    std::list<OutputType> output;
protected:
    int poll_interval; // seconds? milliseconds?
    virtual OutputType query() = 0;
public:
    virtual ~Producer();
    int next_poll_interval() const { return poll_interval; }
    void poll() { output.push_back(this->query()); }
    std::size_t size() { return output.size(); }
    // whatever accessors you need for the queue here:
    // pop_front, swap entire list, etc.
};

现在我们可以从这个Producer派生,并在每个子类型中实现 query 方法。您可以在构造函数中设置poll_interval并保留它,或者在每次调用 query 时更改它。 有常规生产者组件,不依赖于调度机制。

template <typename OutputType>
class ThreadDispatcher
{
    Producer<OutputType> *producer;
    bool shutdown;
    std::thread thread;
    static void loop(ThreadDispatcher *self)
    {
        Producer<OutputType> *producer = self->producer;
        while (!self->shutdown)
        {
            producer->poll();
            // some mechanism to pass the produced values back to the owner
            auto delay = // assume millis for sake of argument
                std::chrono::milliseconds(producer->next_poll_interval());
            std::this_thread::sleep_for(delay);
        }
    }
public:
    explicit ThreadDispatcher(Producer<OutputType> *p)
      : producer(p), shutdown(false), thread(loop, this)
    {
    }
    ~ThreadDispatcher()
    {
        shutdown = true;
        thread.join();
    }
    // again, the accessors you need for reading produced values go here
    // Producer::output isn't synchronised, so you can't expose it directly
    // to the calling thread
};

这是一个简单调度程序的快速草图,它将在线程中运行您的生产者,无论您要求它轮询它的频率如何。请注意,不会显示将生成的值传递回所有者,因为我不知道您希望如何访问它们。

另请注意,我尚未同步对关闭标志的访问 - 它可能应该是原子的,但它可能会通过您选择对生成的值执行的任何操作进行隐式同步。

有了这个组织,也很容易编写一个同步调度程序来在单个线程中查询多个生产者,例如从选择/轮询循环,或者使用 Boost.Asio 和每个生产者的截止日期计时器。