Qt与ZeroMQ发布订阅模式

Qt with ZeroMQ publish subscribe pattern

本文关键字:布订阅 模式 ZeroMQ Qt      更新时间:2023-10-16

我想使用ZeroMQ(4.1.2)与Qt(5.2.1)。想法是有zmq pub/sub(服务器在外面)和sub是qt应用程序。目前在Qt中接收到的应用运行一次,有人能给点提示吗?ZeroMQ接收器应该以其他方式实现吗?

目前我的代码看起来像:

mainwindow.h

namespace Ui {
class MainWindow;
}
class MainWindow : public QMainWindow
{
Q_OBJECT
public:
    explicit MainWindow(QWidget *parent = 0);
    ~MainWindow();
private slots:
    void on_pushButton_clicked();
    void readZMQData();
private:
    Ui::MainWindow *ui;
    QSocketNotifier *qsn;
    void *context;
    void *subscriber;
};

mainwindow.cpp

MainWindow::MainWindow(QWidget *parent) :
    QMainWindow(parent),
    ui(new Ui::MainWindow)
{
    ui->setupUi(this);
    /***** ZMQ *****/
    context = zmq_ctx_new ();
    subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5556");
    char *filter = "";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter));
    unsigned int fd=0;
    size_t fd_size = sizeof(fd);
    rc = zmq_getsockopt(subscriber,ZMQ_FD,&fd,&fd_size);
    qsn = new QSocketNotifier(fd, QSocketNotifier::Read, this);
    connect(qsn, SIGNAL(activated(int)), this, SLOT(readZMQData()), Qt::DirectConnection);
}
MainWindow::~MainWindow()
{
    zmq_close (this->subscriber);
    zmq_ctx_destroy (this->context);
    delete ui;
}

void MainWindow::readZMQData()
{
    qsn->setEnabled(false);
    qDebug() << "Got data!";
    int events = 0;
    std::size_t eventsSize = sizeof(events);
    zmq_getsockopt(subscriber,ZMQ_EVENTS, &events, &eventsSize);
    if(events & ZMQ_POLLIN){
        qDebug() << " ======  Data to read ======";
        char *string = s_recv(subscriber);
        qDebug() << "DATA: " << string;
        free(string);
    }
    qsn->setEnabled(true);
}

服务器应用程序是(从ZeroMQ的例子):

#include "zhelpers.h"
int main (void)
{
    //  Prepare our context and publisher
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket (context, ZMQ_PUB);
    int rc = zmq_bind (publisher, "tcp://*:5556");
    assert (rc == 0);
    //  Initialize random number generator
    srandom ((unsigned) time (NULL));
    while (1) {
        //  Get values that will fool the boss
        int zipcode, temperature, relhumidity;
        zipcode     = randof (100000);
        temperature = randof (215) - 80;
        relhumidity = randof (50) + 10;
        //  Send message to all subscribers
        char update [20];
        sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
        s_send (publisher, update);
    }
    zmq_close (publisher);
    zmq_ctx_destroy (context);
    return 0;
}

首先感谢你的帮助,

我发现了这个问题,当ZeroMQ通知有消息要读时,你需要阅读它们全部,而不仅仅是第一个。

    void MainWindow::readZMQData(int fd)
{
    qsn->setEnabled(false);
    int events = 0;
    std::size_t eventsSize = sizeof(events);
    zmq_getsockopt(subscriber,ZMQ_EVENTS, &events, &eventsSize);
    if(events & ZMQ_POLLIN){
        qDebug() << " ======  Data to read ======";
        char *string;
        // THIS IS THE TRICK! READ UNTIL THERE IS MSG
        while((string = s_recv_nb(subscriber)) != NULL){
            qDebug() << "DATA: " << string;
            free(string);
        }
    }
    qsn->setEnabled(true);
}

套接字通知器看起来应该可以工作。你读过关于如何正确处理的文件了吗?特别是如果你是在Windows上,看起来有特殊的方法来处理它,当做读取…禁用、读取等

http://doc.qt.io/qt-5/qsocketnotifier.html细节

希望对你有帮助。

由于不清楚s_recv_nb(zmq::socket_t & socket)是什么,我将提供我的-稍微更详细的-实现:

      void MainWindow::readZMQData()
      {
       qsn->setEnabled(false);
       int events = 0;
       std::size_t eventsSize = sizeof(events);
       zmq_getsockopt(subscriber,ZMQ_EVENTS, &events, &eventsSize);
       if(events & ZMQ_POLLIN){
       qDebug() << " ======  Data to read ======";
       char *string;
       int64_t more;
       size_t more_size = sizeof (more);
       do {
           /* Create an empty ØMQ message to hold the message part */
           zmq_msg_t part;
           int rc = zmq_msg_init (&part);
           assert (rc == 0);
           rc = zmq_msg_recv (&part, subscriber, 0);
           assert (rc != -1);
           /* Determine if more message parts are to follow */
           rc = zmq_getsockopt (subscriber, ZMQ_RCVMORE, &more, &more_size);
           assert (rc == 0);
           string = (char*) zmq_msg_data(&part);
           qDebug() << QString(string) ; // << "more" << more;
           zmq_msg_close (&part); 
          } while (more);
       }
       qsn->setEnabled(true);
     }

还有一点:在Windows 64的情况下,文件描述符fd应该是uint64_t,因为zmq_getsockopt在Windows x64上返回EINVAL,当ZMQ_FD option_val的本地地址传递