动态生成和安全使用spsc_queues

Dynamic generation & safe usage of spsc_queues

本文关键字:spsc queues 安全 动态      更新时间:2023-10-16

我做的唯一一个boost::lockfreespsc_queue,这太神奇了。

但是,我希望在一个线程与cores - 1线程来回传递信息的情况下实现它。

我认为每个工作线程都有自己的一组spsc_queues,in和out,它们将存储在vectors中,主线程将信息传递到一个传出队列,然后移动到vector中的下一个队列,依此类推,并在传入队列中循环。

两个vector中的这些spsc_queue可以安全地推和弹出吗?

如果没有,是否有其他方法可以根据我的意图使用spsc_queues?

您基本上建议以其预期的方式使用2x(cores-1)spsc_queues。是的,这样可以。

不过,我看不出你会如何处理主线程上的响应("传入队列")。请记住,传入队列上没有"等待"操作,你也不会想要一个(它不再是非常无锁定的,在等待传入消息时,你会让所有其他工作人员都无法使用)。

旁白:如果你对响应队列进行维度划分,使它们永远不会溢出,那么你可以从中进行简单的循环读取(只是不要试图从单个响应队列中读取所有消息,因为这是一种很好的方法,可以让其他响应队列的调度不足)。

底部的代码样本(代码样本)

所有这些都让我强烈怀疑您实际上是异步之后的,而不是并发。我有一种感觉,你会很高兴让你的应用程序在1个线程上运行,只要尽快为每个可用的消息提供服务——无论它是源消息还是内容消息。

  1. 对于可以在很短的时间内处理的大量小消息,此模型将非常好地扩展**[´]**
  2. 当1个线程饱和时,您可以通过添加工人来扩展
  3. 在具有需要更长处理时间的消息的服务中,您可以以异步方式将这些任务卸载到只处理低频请求的专用线程上:它们可以在完成后将小的"完成"消息推回到主工作队列

所有这些都会让我想到像libuv或Boost Asio这样的库。如果您已经知道需要无锁运行才能获得所需的吞吐量(这在工业级服务器解决方案之外非常罕见),那么您可以使用无锁队列来模拟同样的情况。这是更多的工作,因为你必须将epoll/select/poll循环集成到你的生产者中。我建议你保持简单,只在你真正需要的时候采用传统的复杂性。

咒语:正确,先考虑因素;优化后期

代码示例

正如承诺的那样,一个简单的概念证明显示了使用多个双向SPSC队列消息和多个工作线程。

完全无锁定版本:Coliru直播

这里有一些微妙之处。特别要注意的是,队列的维度不足将导致静默丢弃消息。如果消费者能够跟上生产商的步伐,这种情况就不会发生,但只要有操作系统活动,你就不知道,所以你应该为此添加检查。

UPDATE根据评论中的请求,这里有一个版本可以检查队列饱和情况,而不会丢弃消息也可以在Coliru上观看直播

  • 无法删除任何消息
  • 不再有延迟到达(因为在收到所有响应之前,主循环不会退出)
  • 循环不再与循环变量绑定,因为发送可能会暂停,这将导致始终读取相同的响应队列。这是死锁或其他最坏情况下的性能的配方
  • 在队列饱和的情况下,我们必须想出一种适当的方法来平衡负载。我选择睡一会儿。从技术上讲,这意味着当队列饱和时,我们的无锁免等待解决方案会降级为常规协作多线程。如果检测到这种情况,您可能更愿意扩展队列。这完全取决于你的系统
  • 你会想知道这是什么时候发生的;我已经包含了所有线程的简单拥塞统计信息。在我的系统上,使用microsleep调用sleep_for(nanoseconds(1)),输出为:

    Received 1048576 responses (97727 100529 103697 116523 110995 115291 103048 102611 102583 95572 )
    Total: 1048576 responses/1048576 requests
    Main thread congestion: 21.2%
    Worker #0 congestion: 1.7%
    Worker #1 congestion: 3.1%
    Worker #2 congestion: 2.0%
    Worker #3 congestion: 2.5%
    Worker #4 congestion: 4.5%
    Worker #5 congestion: 2.5%
    Worker #6 congestion: 3.0%
    Worker #7 congestion: 3.2%
    Worker #8 congestion: 3.1%
    Worker #9 congestion: 3.6%
    real    0m0.616s
    user    0m3.858s
    sys 0m0.025s
    

    正如你所看到的,Coliru的调谐需要有很大的不同。只要您的系统存在以最大负载运行的风险,就需要进行此调整。

  • 相反,当队列为空时,您必须考虑如何限制负载:此时,工作人员将忙于在队列上循环,等待消息出现。在真实的服务器环境中,当负载突发时,您会希望检测"空闲"时间段并降低轮询频率,以节省CPU功率(同时允许CPU最大限度地增加其他线程的吞吐量)。

这个答案中包括第二个"混合"版本(在队列饱和之前无锁):

#include <boost/lockfree/spsc_queue.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <iostream>
#include <iterator>
namespace blf = boost::lockfree;
static boost::atomic_bool shutdown(false);
static void nanosleep()
{
//boost::this_thread::yield();
boost::this_thread::sleep_for(boost::chrono::nanoseconds(1));
}
struct Worker
{
typedef blf::spsc_queue<std::string > queue;
typedef std::unique_ptr<queue> qptr;
qptr incoming, outgoing;
size_t congestion = 0;
Worker() : incoming(new queue(64)), outgoing(new queue(64)) 
{
}
void operator()()
{
std::string request;
while (!shutdown)
{
while (incoming->pop(request)) 
while (!outgoing->push("Ack: " + request))
++congestion, nanosleep();
}
}
};
int main()
{
boost::thread_group g;
std::vector<Worker> workers(10);
std::vector<size_t> responses_received(workers.size());
for (auto& w : workers)
g.create_thread(boost::ref(w));
// let's give them something to do
const auto num_requests = (1ul<<20);
std::string response;
size_t congestion = 0;
for (size_t total_sent = 0, total_received = 0; total_sent < num_requests || total_received < num_requests;)
{
if (total_sent < num_requests)
{
// send to a random worker
auto& to = workers[rand() % workers.size()];
if (to.incoming->push("request " + std::to_string(total_sent)))
++total_sent;
else
congestion++;
}
if (total_received < num_requests)
{
static size_t round_robin = 0;
auto from = (++round_robin) % workers.size();
if (workers[from].outgoing->pop(response))
{
++responses_received[from];
++total_received;
}
}
}
auto const sum = std::accumulate(begin(responses_received), end(responses_received), size_t());
std::cout << "nReceived " << sum << " responses (";
std::copy(begin(responses_received), end(responses_received), std::ostream_iterator<size_t>(std::cout, " "));
std::cout << ")n";
shutdown = true;
g.join_all();
std::cout << "nTotal: " << sum << " responses/" << num_requests << " requestsn";
std::cout << "Main thread congestion: " << std::fixed << std::setprecision(1) << (100.0*congestion/num_requests) << "%n";
for (size_t idx = 0; idx < workers.size(); ++idx)
std::cout << "Worker #" << idx << " congestion: " << std::fixed << std::setprecision(1) << (100.0*workers[idx].congestion/responses_received[idx]) << "%n";
}

[´]与以往一样,"非常短的时间"是一个相对概念,大致意思是"比新消息之间的平均时间更短的时间"。例如,如果每秒有100个请求,那么对于单线程系统来说,5ms的处理时间将"非常少"。但是,如果每秒有10k个请求,那么1毫秒的处理时间大约是16核服务器的限制。