使用QAMQP/RabbitMQ进行Qt

Qt with QAMQP / RabbitMQ

本文关键字:进行 Qt RabbitMQ QAMQP 使用      更新时间:2023-10-16

我正在使用此连接器连接RabbitMQ:https://github.com/fuCtor/QAMQP

我的应用程序需要性能和多线程。请问,我可以用多线程的连接器吗?

我试过了:

void Test::newMessage(QAMQP::Queue * q) {
    while (q->hasMessage()) {
        QAMQP::MessagePtr message = q->getMessage();
        MyEvent *me = new MyEvent();
        me->message = message;
        poolThreadPosicao->start(me);
    }
}
class MyEvent : public QRunnable {
    public:
    QAMQP::MessagePtr message;
    void run() {
        s.queue->ack(this->message);
    }
};

在一些消息中,RabbitMQ说:"Unacked 10 messages"。这10条消息是我在broker中的qos消息。我需要什么来解决这个问题?怎么办?

首先,我建议您切换到https://github.com/mbroadst/qamqp,因为它是原始项目的替代品(该项目已不再处于积极开发中)。更新后的代码包含许多性能和内存增强,以及对RabbitMQ的更全面支持。话虽如此,目前该项目的两个版本都旨在为每个线程提供一个连接。这意味着,您创建的任何通道(Exchange或队列)都将成为创建它的连接(客户端)的父级,因此绑定到创建线程。

处理您面临的问题的一种方法是从QRunnable和QObject继承,在完成任务时发出消息(注意:这是未经测试的,我只是给出了基本结构):

using namespace QAMQP;
class MessageJob : public QRunnable, public QObject
{
    Q_OBJECT
public:
    MessageJob(const Message &message)
        : m_message(message)
    {
    }
    virtual void run() {
        // process the message
        // when you are done, emit the finished signal
        Q_EMIT finished(m_message);
    }
Q_SIGNALS:
    void finished(const Message &message);
private:
    Message m_message;
};
class Test : public QObject
{
    Q_OBJECT
public:
    Test(QObject *parent = 0)
        : QObject(parent)
    {
        // setup and connect client
        // create queue and start consuming
    }
private Q_SLOTS:
    void messageReceived(const Message &message)
    {
        MessageJob *job = new MessageJob; // no parent, this will be autodeleted
        connect(job, SIGNAL(finished(Message)), this, SLOT(jobFinished(Message)), Qt::QueuedConnection);
        // NOTE: Qt::QueuedConnection is very important as it allows the signal to 
        //       cross threads
        QThreadPool::globalInstance()->start(job);
    }
    void jobFinished(const Message &message) {
        m_queue->ack(message);
    }
private:
    Client m_client;
    Queue *m_queue;
};