是否可以使用boost构建并发进程间消息队列

Is it possible to build a concurrent interprocess message queue using boost?

本文关键字:消息 队列 并发进程 构建 可以使 boost 是否      更新时间:2023-10-16

我正在尝试构建一个具有多个进程的应用程序。这些进程需要通过同一消息队列并发地进行写操作。在另一边,将只有一个进程读取该队列。

是否可以使用boost?还是我必须实现互斥?

我看了看示例源代码,但它不适合我的需要。我不知道我是否错过了什么。

这是客户端的代码:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <vector>
#include <unistd.h>
using namespace boost::interprocess;
int main ()
{
   try{
      //Erase previous message queue
      //message_queue::remove("message_queue");
      //Create a message_queue.
      message_queue mq
         (open_or_create               //only create
         ,"message_queue"           //name
         ,100                       //max message number
         ,sizeof(int)               //max message size
         );
      //Send 100 numbers
      for(int i = 0; i < 100; ++i){
         printf("Sending: %dn", i);
         usleep(1000000);
         mq.send(&i, sizeof(i), 0);
      }
   }
   catch(interprocess_exception &ex){
      std::cout << ex.what() << std::endl;
      return 1;
   }
   return 0;
}

和服务器代码:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <vector>
using namespace std;
using namespace boost::interprocess;
int main ()
{
   try{
      //message_queue::remove("message_queue");
      //Open a message queue.
      message_queue mq
         (open_only      //only create
         ,"message_queue"  //name
         );
      unsigned int priority;
      message_queue::size_type recvd_size;
      //Receive 100 numbers
      for(int i = 0; i < 100; ++i){
         int number;
         mq.receive(&number, sizeof(number), recvd_size, priority);
         if(number != i || recvd_size != sizeof(number))
            return 1;
         cout << number << endl;
      }
   }
   catch(interprocess_exception &ex){
      message_queue::remove("message_queue");
      std::cout << ex.what() << std::endl;
      return 1;
   }
   //message_queue::remove("message_queue");
   return 0;
}

给出的boost::interprocess::message_queue的例子对我有用。这些类已经是线程安全的,所以进程内线程不是问题。

下面是一个完整的共享消息队列示例。如果你使用它有困难,请告诉我。

shared_mq.hpp:

#include <boost/interprocess/ipc/message_queue.hpp>
// could easily be made a template; make sure T is a POD!
class shared_mq {
public:
  shared_mq(const char* const name,
            const unsigned max_queue_size) :
    shared_mq{ name, max_queue_size, delete_queue(name) }
  {}
  shared_mq(const char* const name) :
    mq_{ boost::interprocess::open_only, name }
  {}
  void send(int i) {
    mq_.send(&i, sizeof(i), 0 /* priority */);
  }
  int receive() {
    int result;
    boost::interprocess::message_queue::size_type recvsize;
    unsigned recvpriority;
    mq_.receive(&result, sizeof(result), recvsize, recvpriority);
    return result;
  }
private:
  struct did_delete_t {};
  did_delete_t delete_queue(const char* const name) {
    boost::interprocess::message_queue::remove(name);
    return did_delete_t{};
  }
  shared_mq(const char* const name,
            const unsigned max_queue_size,
            did_delete_t) :
    mq_ { boost::interprocess::create_only, name, max_queue_size, sizeof(int) }
  {}
  boost::interprocess::message_queue mq_;
};

client.cpp:

#include <iostream>
#include <random>
#include <thread>
#include "shared_mq.hpp"
void send_ints(shared_mq& mq, const unsigned count) {
  std::random_device rd;
  std::mt19937 mt{ rd() };
  std::uniform_int_distribution<int> dist{0, 10000};
  for (unsigned i = 0; i != count; ++i) {
    mq.send(dist(mt));
  }
}
int main ()
{
  std::cout << "Starting client." << std::endl; 
  try {
    std::cout << "Creating queue..." << std::endl;
    constexpr unsigned kQueueSize = 100;
    shared_mq mq{ "my_queue", kQueueSize };
    std::cout << "Sending ints..." << std::endl;
    std::thread t1{ send_ints, std::ref(mq), 25};
    std::thread t2{ send_ints, std::ref(mq), 25};
    t1.join();
    t2.join();
    mq.send(-1);  // magic sentinel value
  }
  catch (boost::interprocess::interprocess_exception& ex) {
    std::cerr << ex.what() << std::endl;
    return 1;
  }
  std::cout << "Finished client." << std::endl;
  return 0;
}

server.cpp:

#include <iostream>
#include "shared_mq.hpp"
int main ()
{
  std::cout << "Starting server." << std::endl; 
  try {
    std::cout << "Opening queue..." << std::endl;
    shared_mq mq{ "my_queue" };
    std::cout << "Receiving ints..." << std::endl;
    for (;;) {
      const int x = mq.receive();
      if (x == -1) {
        // magic sentinel value
        break;
      }
      std::cout << "Received: " << x << std::endl;
    }
  }
  catch (boost::interprocess::interprocess_exception& ex) {
    std::cerr << ex.what() << std::endl;
    return 1;
  }
  std::cout << "Finished server." << std::endl;
  return 0;
}