在像LMAX破坏者这样的模式中,你如何处理一个缓慢的消费者

In an LMAX disruptor like pattern, how do you handle a slow consumer?

本文关键字:处理 消费者 缓慢 何处理 一个 破坏者 LMAX 模式 在像      更新时间:2023-10-16

我有一个问题,在像环形缓冲区这样的lmax破坏者中,有多个生产者和一个运行在x86 Linux上的单个消费者的情况下,该怎么办?使用类似lmax的环形缓冲区模式,您将不断覆盖数据,但如果消费者速度较慢怎么办?因此,你如何处理这样的情况,比如在10个大小的环形缓冲区0-9个环形槽中,你的消费者在槽5上,现在你的写入器准备开始写入槽15,这也是缓冲区中的槽5(即:槽5 = 15% 10)?处理这种情况的典型方法是什么?这样写程序仍然按照数据进入的顺序生成数据,而客户端也将按照相同的顺序接收数据。这就是我的问题。以下是我设计的一些细节,它工作得很好,只是我目前没有一个很好的方法来处理这个问题。有多个线程做写,一个线程做读,我不能在不改变现有设计的情况下引入多个读取线程,这超出了当前的项目范围,但是如果它们涉及到这个作为解决方案,我仍然对你的想法感兴趣。

设计细节

我有一个环缓冲区,设计目前有多个生产者线程和一个消费者线程。这部分设计是现有的,目前不能更改。我试图删除现有的排队系统使用锁自由环缓冲区。我的资料如下。

代码在x86 Linux上运行,有多个线程用于写程序,只有一个线程用于读取程序。读取器和写入器分开一个插槽,并且是std::atomic<uint64_t>,因此读取器从插槽0开始,写入器从插槽1开始,然后每个写入器首先通过调用下面的incrementSequence在写入器序列上执行原子fetch_add(1, std::memory_order::memory_order_acq_rel)来声明一个插槽,然后使用compare_and_swap循环更新读取器序列,让客户端知道这个插槽可用(参见updateSequence )。

 inline data_type incrementSequence() {                                                                                       
        return m_sequence.fetch_add(1,std::memory_order::memory_order_seq_cst);                                                  
    }   

void updateSequence(data_type aOld, data_type aNew) {                                                                        
        while ( !m_sequence.compare_exchange_weak(aOld, aNew, std::memory_order::memory_order_release, std::memory_order_relaxed)
            if  (sequence() < aNew)  {                                                                                           
                continue;                                                                                                        
            }                                                                                                                    
            break;                                                                                                               
        }                                                                                                                        
    }                   
 inline data_type sequence() const {                                                                                          
        return m_sequence.load(std::memory_order::memory_order_acquire);                                                         
    }       
      

环形缓冲区(或一般的FIFO -不必作为环形缓冲区实现)旨在平滑流量突发。尽管生产者可能会突然产生数据,但消费者可以处理稳定的输入流。

如果FIFO溢出,这意味着两种情况之一:

  1. 你的爆发比你计划的要大。通过增加FIFO大小(或使其大小动态)来修复此问题。
  2. 你们的生产者比你们的消费者跑得快。通过增加用于消费数据的资源(可能是更多的线程)来解决这个问题。

在我看来,你目前正在触及第二点:你的单一消费者根本跟不上生产者的速度。在这种情况下,唯一真正的选择是通过优化单个消费者或增加更多消费者来加速消费。

它也听起来有点像你的消费者在处理数据时可能会在FIFO中留下他们的输入数据,因此FIFO中的那个点仍然被占用,直到消费者完成处理该输入。如果是这样,您可以通过简单地让消费者在FIFO开始处理时立即从FIFO删除输入数据来解决问题。这将释放该插槽,以便生产者可以继续将输入放入缓冲区。

另一点:使FIFO大小动态可能是一个问题。这个问题相当简单:它可以掩盖这样一个事实,即您确实存在第二个问题,即没有必要的资源来处理消费者端的数据。

假设生产者和消费者都是线程池,平衡系统的最简单方法通常是使用固定大小的FIFO。如果生产者开始远远领先于消费者,FIFO溢出,那么生产者开始阻塞。这让消费者线程池消耗更多的计算资源(例如,在更多的内核上运行)来赶上生产者。然而,这依赖于能够添加更多的消费者,而不是将系统限制为单个消费者。