C lambda回调为触发事件

c++ lambda callback to trigger event

本文关键字:事件 lambda 回调      更新时间:2023-10-16

我一直在试图将我的头缠绕在C 中的回调功能。我要实现的目标是:

我有两个对象,每个对象都有自己的线程。一个对象A具有指向第二个对象B的指针。请参阅示例:

class A
{
  public:
   // ...
  private:
   std::unique_ptr<B> b;
};
class B
{
  public:
   void add_message(MessageType msg);
   // ...
};

我要实现的目标是让对象A使用B的指针添加消息,然后继续执行其他操作,但是当B对该消息回复时,请使用回调或处理程序或触发的内容。B对消息进行了一些处理,并可能将其传递给其他对象以便在其线程上处理,但最终会提出答复。因此,我怎么知道B何时对我的消息进行答复,例如:

// In class A
MessageType m();
b->add_message(m)
// A's thread continues doing other stuff
...
// some notification that b has a reply?

我知道我可能必须使用STD ::函数进行回调,但我想使用的是,但我无法通过查看许多示例来确切地做到这一点。对任何帮助都表示赞赏,并注意到我已经研究了很多例子,但不能将其与我要实现或不了解的目标联系在一起...

线程是执行序列。它们的行为与线性C 程序大致相同,嵌入了内存模型中,该模型使它们可以交流并注意到其他执行线程引起的状态变化。

在没有线程的合作的情况下,对线程的回调无法接管执行顺序。您要通知的线程必须明确检查以查看消息是否到达并处理。


有两种通用方法来处理对消息的响应。

第一个是类似于std::future的方法。在其中,呼叫者获得了某种代币,而令牌代表了将来可能或将来会产生的答案。

第二个是再次使用消息传递。您将消息发送给B请求响应。B将消息发送回响应。B收回消息的方式相同,A收回消息。该消息可能包含某种类型的"返回目标",以帮助将其链接到原始消息。

在基于消息的系统中,通常具有"事件循环"。您没有一个线性程序,而是一个多次返回"事件循环"的线程。在那里,它会检查一个队列的消息,如果没有,则等待一些。

必须将任务分解为这样的系统下的咬合大小的块,以便您经常检查事件循环以响应。

一种方法是与Coroutines在一起,Coroutines是执行状态,而无需拥有自己的执行器(例如,拥有两者的线程)。Coroutines定期放弃优先级,并"保存他们的状态以后"。


未来的解决方案通常是最简单的,但它依赖于定期检查响应。

首先,threaded_queue<T>,它可以让任何数量的生产者和消费者都将事物带入队列并将其吞噬前面:

template<class T>
struct threaded_queue {
  using lock = std::unique_lock<std::mutex>;
  void push_back( T t ) {
    {
      lock l(m);
      data.push_back(std::move(t));
    }
    cv.notify_one();
  }
  boost::optional<T> pop_front() {
    lock l(m);
    cv.wait(l, [this]{ return abort || !data.empty(); } );
    if (abort) return {};
    auto r = std::move(data.back());
    data.pop_back();
    return std::move(r);
  }
  void terminate() {
    {
      lock l(m);
      abort = true;
      data.clear();
    }
    cv.notify_all();
  }
  ~threaded_queue()
  {
    terminate();
  }
private:
  std::mutex m;
  std::deque<T> data;
  std::condition_variable cv;
  bool abort = false;
};

现在,我们想将任务传递到这样的队列中,并让一个通过任务取回结果。这是包装任务的上述用途:

template<class...Args>
struct threaded_task_queue {
  threaded_task_queue() = default;
  threaded_task_queue( threaded_task_queue&& ) = delete;
  threaded_task_queue& operator=( threaded_task_queue&& ) = delete;
  ~threaded_task_queue() = default;
  template<class F, class R=std::result_of_t<F&(Args...)>>
  std::future<R> queue_task( F task ) {
    std::packaged_task<R(Args...)> p(std::move(task));
    auto r = p.get_future();
    tasks.push_back( std::move(p) );
    return r;
  }
  void terminate() {
    tasks.terminate();
  }
  std::function<void(Args...)> pop_task() {
    auto task = tasks.pop_front();
    if (!task) return {};
    auto task_ptr = std::make_shared<std::packaged_task<R(Args...)>>(std::move(*task));
    return [task_ptr](Args...args){
      (*task_ptr)(std::forward<Args>(args)...);
    };
  }
private:
  threaded_queue<std::packaged_task<void(Args...)>> tasks;
};

如果我做对了,那就这样起作用:

  • a以lambda的形式将队列发送给B。该lambda采用了一些固定的参数(由B提供),并返回一些价值。

  • b弹出队列,并获得一个参数的std::function。它调用它;它在B的上下文中返回void

  • a在排队任务时给出了future<R>。它可以查询以查看是否完成。

您会注意到,不能"通知"一个事情已经完成。这需要不同的解决方案。但是,如果A最终达到了如果不等待B的结果就无法进步,则此系统可行。

另一方面,如果A积累了大量此类消息,有时需要等待许多此类BS的输入,直到其中任何一个返回数据(或用户做某事),则您需要比一个比A更先进的东西std::future<R>。一般模式 - 具有代表未来计算的代币 - 是可靠的。但是,您需要扩大它以与未来的计算和消息循环的多种来源发挥作用。

代码未测试。

发送消息时threaded_task_queue的一种方法是:

template<class Signature>
struct message_queue;
template<class R, class...Args>
struct message_queue<R(Args...) :
  threaded_task_queue< std::function<R(Args...)> >
{
  std::future<R> queue_message(Args...args) {
    return this->queue_task(
      [tup = std::make_tuple(std::forward<Args>(args)...)]
      ( std::function<R(Args...)> f ) mutable
      {
        return std::apply( f, std::move(tup) );
      }
    );
  }
  bool consume_message( std::function<R(Args...)> f )
  {
    auto task = pop_task();
    if (!task) return false;
    task( std::move(f) );
    return true;
  }
};

在提供商侧的位置,您提供Args...,并且在消费者方面,您可以消耗Args...并返回R,在提供商方面,您有一个future<R>来获得结果。

这可能比我写的原始threaded_task_queue更自然。

std::apply是C 17,但野外有C 11和C 14的实现。