具有多个线程的安全消息队列

Safe Message Queue with multiple threads

本文关键字:安全 消息 队列 线程      更新时间:2023-10-16

下面是我的代码:

我有线程A,它定期检查消息并处理它们。

线程B和C需要向a发送消息

当B和C或B或C试图在a处理消息并访问队列时向a发送消息时,问题就出现了。

这个问题通常是如何解决的?

谢谢

通常使用互斥锁或其他多线程保护机制来解决。

如果你在windows上工作,MFC为这个问题提供了CMutex类。

如果你在posix系统上工作,posix api提供了pthread_mutex_lock, pthread_mutex_unlockpthread_mutex_trylock函数。

一些基本的伪代码可以方便地演示它们在您的情况下的用法:

pthread_mutex_t mutex; *or* CMutex mutex;
Q queue;  // <-- both mutex and queue are global state, whether they are
          //     global variables, or passed in as parameters, they must
          //     be the shared by all threads.
int threadA(/* params */){
    while( threadAStillRunning ){
        // perform some non-critical actions ...
        pthread_mutex_lock(mutex) *or* mutex.Lock()
        // perform critical actions ...
        msg = queue.receiveMessage()
        pthread_mutex_unlock(mutex) *or* mutex.Unlock()
        // perform more non-critical actions
    }
}
int threadBorC(/* params */){
    while( theadBorCStillRunning ){
        // perform some non-critical actions ...
        pthread_mutex_lock(mutex) *or* mutex.Lock()
        // perform critical actions ...
        queue.sendMessage(a_msg)
        pthread_mutex_unlock(mutex) *or* mutex.Unlock()
    }
}

对于这三个线程,它们对队列的操作能力取决于它们获取互斥锁的能力——它们只是阻塞并等待,直到互斥锁被获取。这可以防止因使用该资源而产生冲突。

如果您不是在windows上,或者如果您正在使用c++实现跨平台的东西,请尝试使用ACE库中的Queue。

ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue;

作为ACE库示例中的示例,然后可以使用将消息放入队列:

  ACE_NEW_RETURN (mb,
              ACE_Message_Block (rb.size (),
              ACE_Message_Block::MB_DATA,
              0,
              buffer),
              0);
  mb->msg_priority (ACE_Utils::truncate_cast<unsigned long> (rb.size ()));
  mb->wr_ptr (rb.size ());
  ACE_DEBUG ((LM_DEBUG,
          "enqueueing message of size %dn",
          mb->msg_priority ()));
 // Enqueue in priority order.
 if (msg_queue->enqueue_prio (mb) == -1)
ACE_ERROR ((LM_ERROR, "(%t) %pn", "put_next"));

从队列中获取:

 ACE_Message_Block *mb = 0;
 msg_queue->dequeue_head (mb) == -1;
 int length = ACE_Utils::truncate_cast<int> (mb->length ());
 if (length > 0)
   ACE_OS::puts (mb->rd_ptr ());
  // Free up the buffer memory and the Message_Block.
  ACE_Allocator::instance ()->free (mb->rd_ptr ());
  mb->release ();

的优点是您可以非常容易地更改同步原语,而无需编写太多代码。