在线程之间复制 std::vector 而不锁定

Copying std::vector between threads without locking

本文关键字:vector 锁定 std 线程 之间 复制      更新时间:2023-10-16

我有一个在一个线程中修改的向量,我需要在另一个线程中使用它的内容。由于性能要求,这些线程之间的锁定是不可接受的。由于在向量更改时遍历它会导致崩溃,因此我想复制向量,然后迭代副本。我的问题是,这种方式也会崩溃吗?

struct Data
{
int A;
double B;
bool C;
};
std::vector<Data> DataVec;
void ModifyThreadFunc()
{
// Here the vector is changed, which includes adding and erasing elements
...
}
void ReadThreadFunc()
{
auto temp = DataVec;    // Will this crash?
for (auto& data : temp)
{
// Do stuff with the data
...
}
// This definitely can crash
/*for (auto& data : DataVec)
{
// Do stuff with the data
...
}*/
}

vector::operator=的基本线程安全保证是:

"如果引发异常,则容器处于有效状态。">

这里可能有哪些类型的例外?

编辑:

我使用双缓冲解决了这个问题,并在下面发布了我的答案。

正如其他答案所指出的,你所要求的是不可行的。如果您有并发访问,则需要同步,故事结束。

话虽如此,像您这样的要求不是一种同步选项并不罕见。在这种情况下,您仍然可以做的是摆脱并发访问。例如,您提到在游戏循环(如执行)中每帧访问一次数据。是严格要求从当前帧获取数据,还是也可以从最后一帧获取数据?

在这种情况下,您可以使用两个向量,一个由生产者线程写入,另一个由所有使用者线程读取。在帧的末尾,您只需交换两个向量即可。现在,您不再需要 *(1)细粒度同步进行数据访问,因为不再有并发数据访问。

这只是如何执行此操作的一个例子。如果需要摆脱锁定,请开始考虑如何组织数据访问,以避免首先陷入需要同步的情况。

*(1):严格来说,您仍然需要一个同步点,以确保在执行交换时,所有编写器和读取器线程都已完成工作。但这要容易得多(通常每帧的末尾都有这样的同步点),并且对性能的影响远小于每次访问矢量时进行同步。

我的问题是,这种方式也会崩溃吗?

是的,您仍然有数据竞赛。如果线程 A 在线程 B 创建副本时修改向量,则向量的所有迭代器都将失效。

这里可能有哪些类型的例外?

std::vector::operator=(const vector&)将在内存分配失败时抛出,或者如果包含的元素在副本上抛出。同样的事情也适用于复制构造,这就是代码中标记为">这会崩溃吗?"的行实际上正在做的事情。


这里的根本问题是std::vector不是线程安全的。您必须使用lock/mutex保护它,或者将其替换为线程安全的容器(例如Boost.Lockfree或libcds中的无锁容器)。

我有一个在一个线程中修改的向量,我需要在另一个线程中使用它的内容。由于性能要求,这些线程之间的锁定是不可接受的。

这是不可能满足的要求。

无论如何,2 个线程之间的任何数据共享都需要一种锁定,无论是显式锁定还是提供的实现(最终是硬件)。您必须再次检查您的实际需求:挂起一个线程直到另一个线程结束是不可接受的,但您可以锁定简短的指令序列。和/或可能使用不同的架构。例如,擦除矢量中的项目是一项代价高昂的操作(线性时间,因为您必须将所有数据移动到已删除项目上方),而将其标记为无效要快得多(恒定时间,因为它是一次写入)。如果你真的必须在矢量中间擦除,也许列表会更合适。

但是,如果您可以在ReadThreadFunc向量的副本周围以及ModifyThreadFunc中的任何向量修改周围放置锁定排除项,那就足够了。要优先考虑修改线程,您可以尝试锁定另一个线程,如果不能,请立即放弃。

也许你应该重新考虑你的设计!

每个线程都应该有自己的向量(列表,队列任何适合您需求的内容)来处理。所以线程 A 可以做一些工作并将结果传递给 thrad B。从线程 A int 线程 B 的队列写入数据时,您只需锁定即可。

没有某种锁定是不可能的。

所以我使用双缓冲解决了这个问题,这保证不会崩溃,并且读取线程将始终具有可用数据,即使它可能不正确:

struct Data
{
int A;
double B;
bool C;
};
const int MAXSIZE = 100;
Data Buffer[MAXSIZE];
std::vector<Data> DataVec;
void ModifyThreadFunc()
{
// Here the vector is changed, which includes adding and erasing elements
...
// Copy from the vector to the buffer
size_t numElements = DataVec.size();
memcpy(Buffer, DataVec.data(), sizeof(Data) * numElements);
memset(&Buffer[numElements],  0, sizeof(Data) * (MAXSIZE - numElements));
}
void ReadThreadFunc()
{
Data* p = Buffer;
for (int i = 0; i < MAXSIZE; ++i)
{
// Use the data
...
++p;
}
}