在c++ 11中安全、明确地操作原子变量

Safely and unambiguously manipulating atomic variables in C++11

本文关键字:操作 变量 c++ 安全      更新时间:2023-10-16

我必须从多播(UDP)流中读取一些数据(以令人眼花缭乱的速度-高达每秒5000条消息)。因为流是多播的(数据非常关键),所以数据提供者提供了两个发送相同数据的流(它们的逻辑是在两个流中丢失相同数据包的可能性非常接近于零)。所有数据包都标有序列号,以便跟踪。

此外,应用程序是如此的时间紧迫,以至于我被迫并行地监听两个流,并从第一个接收到它的多播流中选择下一个序列号——当相同的数据包出现在镜像流中时,我只是将其丢弃。

我计划在两个函数之间使用一个通用的"sequence_number"变量来实现这个drop功能——顺便说一下,这两个函数在不同的线程中运行。序列号是atomic,因为它将从两个不同的线程读取和更新。

最明显的算法是
if (sequence number received from the stream > sequence_number)
{
   process packet;
   sequence_number = sequence number received from the stream;
}

(上面的算法需要修改的时候,序列号出了顺序-他们可以,因为它是一个UDP流-但让我们暂时忘记它)

我的问题是:

从时间I std::load到我的sequence_number,检查它是否小于我从流收到的序列号,接受数据包,并最后 std::store新的序列号到sequence_number;如果另一个流接收到相同的数据包(具有相同的序列号)并执行相同的操作(在第一个流完成对该序列号的std::store之前),我基本上会在系统中两次使用相同的数据包。如何克服这种情况?

不要等到以后再担心处理乱序数据包的问题,因为解决这个问题也为同步线程提供了最优雅的解决方案。

数组的元素是用于数据竞争的唯一内存位置。如果根据每个数据包的序列号将其放入不同的数组元素中(通过指针写入自动完成),就可以避免大部分争用。还可以使用compare-exchange来检测其他线程(其他流)是否已经看到了该数据包。

请注意,您不会有通常与比较交换相关联的重试循环,或者您有数据包的第一个副本并且比较交换成功,或者数据包已经存在并且您的副本可以被丢弃。所以这种方法不仅没有锁,而且没有等待:)

这里有一个选项,如果您使用std::atomic值,则使用compare_exchange

没有显示如何初始化last_processed_seqnum,因为您需要将其设置为一个有效值,即比下一个到达的数据包的序列数少1。

需要对存在序列号间隙的情况进行调整。你在前提中提到不会有掉落的序列;但是下面的示例将在任何序列间隔时停止处理数据包(即灾难性地失败)。

std::atomic<int> last_processed_seqnum;
// sync last_processed_seqnum to first message(s).
int seqnum_from_stream = ...;
int putative_last_processed_seqnum = seqnum_from_stream - 1;

if (last_processed_seqnum.compare_exchange_strong(putative_last_processed_seqnum,
                                                  seqnum_from_stream))
{
   // sequence number has been updated in compare_exchange_strong
   // process packet;
} 

理想情况下,我们想要的是一个使用大于而不是等于的compare_exchange函数。我不知道有什么方法能在一次手术中达到这种效果。我链接到的SO问题链接到一个关于迭代小于目标值的所有值以更新的答案。

您可能正在实现一个价格提要处理程序,它是哪个交易所以及什么协议?是瘙痒还是快速修复?我不建议对同一个feed使用两个线程,因为您可能需要为不同的细分市场/板加入几个多播组。