通用的消息

Generic messaging

本文关键字:消息      更新时间:2023-10-16

我正在用c++开发一个消息传递系统。我有;

class MessageData
{
public:
    typedef std::vector<std::shared_ptr<MessageData>> MessageList;
    virtual int getValue(std::shared_ptr<int>) { throw "Not implemented!"; };
    virtual float getValue(std::shared_ptr<float>) { throw "Not implemented!"; };
    virtual std::string getValue(std::shared_ptr<std::string>) { throw "Not implemented!"; };
    ...
    ...
    virtual ~MessageData() {};
};
template <typename T>
class Message : public MessageData
{
    T val;
public:
    static std::shared_ptr<Message<T>> Make(T val) { return std::make_shared<Message<T>>(val); };
    static T Get(std::shared_ptr<MessageData> in) { return in->getValue(std::make_shared<T>()); };
    Message(T i) { val = i; };
    T getValue(std::shared_ptr<T> out) override { return *out = val; }
    ~Message() {};
};

使用这些,我可以方便地发送/接收不同长度的通用消息,例如:

sendMessage(MessageData::MessageList{
                Message<std::string>::Make("paint"),
                Message<int>::Make(14),
                Message<float>::Make(129.3f),
                ...
            });

然后得到值;

sendMessage(MessageData::MessageList data) {
    auto a = Message<std::string>::Get(data[0]);
    auto b = Message<int>::Get(data[1]);
    auto c = Message<float>::Get(data[2]);
    ...
}

缺点是我必须列出在MessageData类中需要使用的所有类型。这不是什么大问题,因为我可以限制我想要支持的类型,但我真的很好奇如何在不使用第三方库的情况下模板化类型列表。或者是否有一种完全不同的更好的方法,我可以使用类似的干净语法和类型安全来传递消息?

让代码更通用的一种方法是:

template <typename ... Ts>
class MessageDataImp;
template <typename T>
class MessageDataImp<T>
{
public:
    virtual ~MessageDataImp() = default;
    virtual T getValue(std::shared_ptr<T>) { throw "Not implemented!"; };
};
template <typename T, typename ... Ts>
class MessageDataImp<T, Ts...> : public MessageDataImp<T>, public MessageDataImp<Ts...>
{
public:
    using MessageDataImp<T>::getValue;
    using MessageDataImp<Ts...>::getValue;
};
template <typename ... Ts>
class MessageDataTs : public MessageDataImp<Ts...>
{
public:
    typedef std::vector<std::shared_ptr<MessageDataTs<Ts...>>> MessageList;
};
using MessageData = MessageDataTs<int, float, std::string>;

我想我已经找到了一个解决问题的好办法。

class MessageData {
public:
    typedef std::vector<std::shared_ptr<MessageData>> MessageList;
    virtual ~MessageData() {};
};
template<typename T>
class Message : public MessageData {
    T val;
public:
    template<typename U>
    friend U GetMessage(std::shared_ptr<MessageData> in);
    Message(T i) { val = i; };
};
template<typename T>
T GetMessage(std::shared_ptr<MessageData> in) {
    std::shared_ptr<Message<T>> tmp = std::dynamic_pointer_cast<Message<T>>(in);
    if (tmp) {
        return tmp->val;
    } 
    throw "Incorrect type!";
};
template<typename T>
std::shared_ptr<Message<T>> MakeMessage(T val)
{
    return std::make_shared<Message<T>>(val);
};

然后发送&使用;

接收值
sendMessage(MessageData::MessageList{
                MakeMessage(std::string("paint")),
                MakeMessage(14),
                MakeMessage(129.3f),
                ...
            });
sendMessage(MessageData::MessageList data) {
    auto a = GetMessage<std::string>(data[0]);
    auto b = GetMessage<int>(data[1]);
    auto c = GetMessage<float>(data[2]);
    ...
}

假设它是基于非优先队列的简单多读多写消息总线,我想我会从这样的东西开始:-

注意我使用了boost::variant/optional。如果有可用的std::版本,这些可以很容易地替换。

我使用了variant,因为它有效地满足了大多数具有编译时安全性的用例。

std/boost::任何版本都需要对总线的用户进行大量的(可能是不受欢迎的)关注。

#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <condition_variable>
#include <boost/variant.hpp>
#include <boost/optional.hpp>
template<class Mutex> auto get_lock(Mutex& m) { return std::unique_lock<Mutex>(m); }
template<class...Types>
struct message_bus
{
    using message_type = boost::variant<Types...>;
    void push(message_type msg) {
        auto lock = get_lock(mutex_);
        messages_.push(std::move(msg));
        lock.unlock();
        activity_.notify_one();
    }
    boost::optional<message_type> wait_pop()
    {
        boost::optional<message_type> result;
        auto lock = get_lock(mutex_);
        activity_.wait(lock, [this] { return this->stopped_ or not this->messages_.empty(); });
        if (not messages_.empty())
        {
            result = std::move(messages_.front());
            messages_.pop();
        }
        return result;
    }
    void signal_stop()
    {
        auto lock = get_lock(mutex_);
        stopped_ = true;
        lock.unlock();
        activity_.notify_all();
    }

    std::queue<message_type> messages_;
    std::mutex mutex_;
    std::condition_variable activity_;
    bool stopped_ = false;
};
static std::mutex emit_mutex;
template<class T>
void emit(const T& t)
{
    auto lock = get_lock(emit_mutex);
    std::cout << std::this_thread::get_id() << ": " << t << std::endl;;
}
int main()
{
    using bus_type = message_bus<std::string, int>;
    bus_type mb;
    std::vector<std::thread> threads;
    for (int i = 0 ; i < 10 ; ++i)
    {
        threads.emplace_back([&]
        {
            for(;;)
            {
                auto message = mb.wait_pop();
                if (not message)
                    break;
                boost::apply_visitor([](auto&& data) { emit(data); }, message.value());
            }
        });
    }
    for (int i = 0 ; i < 1000 ; ++i)
    {
        mb.push("string: " + std::to_string(i));
        mb.push(i);
    }
    mb.signal_stop();
    for (auto& t : threads) if (t.joinable()) t.join();
}