对稀疏阵列的高效多线程写访问

Efficient multi-threaded write access to sparse array?

本文关键字:高效 多线程 写访问 阵列      更新时间:2023-10-16

我有一个大数组double*,多个线程对它进行写入。

我用一个boost::mutex来保护每个写操作,但这会引入争用,并使所有操作都非常缓慢,几乎是非并行的。

有没有更好的方法来控制对我的数组的多线程写访问?

具体来说,在我的情况下,数组是稀疏的,每个线程通常会写入数组的不同部分,我该如何利用这一点;对同一索引的并发写入应该很少见,并且主要发生在少数数组索引上。

编辑:准确地说,每个线程在多个数组索引上使用+=来增加值。

使用消息队列。使入队方法原子更新(即单指针交换),您应该能够恢复并发性。然后让一个单独的(单个)线程从队列中读取并写入数组。

如果能提供更多关于正在执行的更新类型的信息,我可以对此进行扩展。但一般来说,您可以找到许多无锁队列实现,这些实现应该有助于您做到这一点(例如这里)。

编辑以回答OP编辑:您需要构造一个类,该类存储索引对列表和更新值(或更新函数)。

class UpdateMessage {
    public:
    vector<Pair<int, int>> updates;   
}

或者类似的东西。然后,阅读器可以获取一条更新消息,并在该向量上迭代,为给定消息执行所有更新。


使用MoodyCamel队列

假设可以在不锁定数组的情况下计算更新,这里有一个快速而肮脏的实现,应该可以满足您的需求。

using namespace moodycamel;
typedef Updates vector<Pair<int, double>>;
ReaderWriterQueue<Updates> queue(100);
double array[] = initialize_array();
int sleep_interval = 10; // in microseconds, you'll probably want to do something smarter than a
                         // fixed interval here.
void read(ReaderWriterQueue queue) {
    Updates updates;
    bool succeeded = queue.try_dequeue(updates);
    if(succeeded) {
        for(auto it = updates.begin(); it != updates.end(); it = updates.next()) {
            array[it.x] = it.y;
        }
    }
}
void write(ReaderWriterQueue queue, Updates ups) {
    bool succeeded;
    do {
        succeeded = queue.try_enqueue(ups);
        usleep(sleep_interval);
    } while(!succeeded);
}

当然,如果插入失败,这会使写入线程旋转。如果这是不可接受的,您可以直接使用try_enqueue,并在enqueue失败的情况下执行您想做的任何操作。

如果您的环境支持C++11,那么只需将双数组替换为

  • 固定阵列的std::array<std::atomic<double>, N>,或
  • 动态阵列的CCD_ 7

只要不同的线程不写入相邻的索引(即缓存线的错误共享),性能和可扩展性就应该明显优于boost::mutex

如果对单个struct的访问事件之间存在一定的粒度(即数据写入不是连续发生的,并且速度快于执行流所能容纳的速度),那么在C++中创建一个线程安全的生产者-消费者队列而不使用锁将是一个可行的选择。这种方法将允许在高命中频率的时段期间在数据队列内建立数据,然后,随着命中频率的降低,队列的大小将随着数据被写入目标(struct)而减小。最终效果将允许您重新获得执行并发性。

实现的最佳描述(不在此处复制)如下:在C++中创建线程安全的生产者-消费者队列,而不使用锁