如何为数组中的元素锁定互斥锁,而不是为整个数组锁定互斥体

How can i lock a MUTEX for an element in the array, not for the complete array

本文关键字:锁定 数组 元素      更新时间:2023-10-16

问题的简短版本:我有 2 个函数共享同一个数组,当一个在编辑它时,另一个在读取它。但是,向量很长(5000 个样本(,很少发生并发访问。但是MUTEX1上的Mutex争用正在减慢该计划的速度。'

如何锁定内存的某些位置而不是整个块以减少争用?

编辑:注意:我必须尽可能使用更新的G值。

编辑2:例如,我有长度为5000的数组G。 foo1锁定mutex1以编辑索引 124。 尽管foo2想要编辑索引 2349,但在foo1发布mutex1之前它不能。

有没有办法将锁定互斥锁的争用向下移动到元素级别? 意思是:我希望foo2foo1只在同一个互斥锁上竞争,只有当他们想要编辑相同的索引时。例如:foo1想要编辑索引 3156,foo2想要编辑索引 3156。

带有代码解释的长版本:我正在为一个复杂的数学函数编写代码,我正在使用 pthreads 来并行代码并提高性能。代码非常复杂,我可以发布它,但我可以将模型发布到代码中。

基本上,我想使用 2 个并行运行的线程编辑 2 个数组。一个线程运行foo1,另一个线程运行foo2 。 但是,它们应该以特定的顺序运行,我使用 es( _B_A1_A2 mutex 来授予序列。它如下:

foo1 (first half)
foo2 (first half) and foo1 (second half) (in parallel)
foo1 (first half) and foo2 (second half) (in parallel)
...
foo2(second half)

然后我会检索我的结果。在foo1的上半部分,我将在G1中使用结果,这些结果可能会同时由foo2编辑。 因此,我使用Mutex1来保护它。同样的情况发生在foo2 G. 但是,将整个向量锁定为 1 个值非常有效,它们几乎从不同时编辑相同的内存位置。当我比较结果时,它几乎总是相同的。我想要一种一次锁定一个元素的方法,以便它们只对同一元素提出异议。

我将为有兴趣了解其工作原理的人描述代码:

#include <pthread.h>
#include <iostream>
using namespace std;
#define numThreads 2
#define Length 10000
pthread_t threads[numThreads];
pthread_mutex_t mutex1   = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Mutex_B  = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Mutex_A1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Mutex_A2 = PTHREAD_MUTEX_INITIALIZER;
struct data_pointers
{
    double  *A;
    double  *B;
    double  *G;
    double  *L;
    int idxThread;
};
void foo1   (data_pointers &data);
void foo2   (data_pointers &data);
void *thread_func(void *arg){
    data_pointers data = *((data_pointers *) arg);
    if (data.idxThread==0)
        foo1 (data);
    else
        foo2 (data);
}

到目前为止,它是定义和线程调用函数,请记住,我定义了Length 10000numThreads 2

void foo1 ( data_pointers &data)
{
    double *A           = data.A;
    double *L           = data.L; 
    double *G           = data.G; 
    double U;
    for (int ijk =0;ijk<5;ijk++){
        /* here goes some definitions*/
        pthread_mutex_lock(&Mutex_A1);
        for (int k =0;k<Length;k++){
            pthread_mutex_lock(&mutex1); 
            U = G[k];
            pthread_mutex_unlock(&mutex1);
            /*U undergoes a lot of mathematical operations here

            */
        }
        pthread_mutex_lock(&Mutex_B);
        pthread_mutex_unlock(&Mutex_A2);
        for (int k =0;k<Length;k++){
            /*U another mathematical operations here

            */
            pthread_mutex_lock(&mutex1);
            L[k] = U;
            pthread_mutex_unlock(&mutex1);
            pthread_mutex_unlock(&Mutex_B);
        }
    }
}

在 foo1 中,我锁定mutexA1并完成我的工作,然后锁定MutexB并解锁MutexA2以便foo2可以开始工作。请注意,main首先锁定MutexA2。这样我保证foo1下半场开始时mutexB锁定,这样,foo2在解锁foo1之前无法进入函数的后半部分mutexB

void foo2 (data_pointers &data)
{
    double *A           = data.A;
    double *L           = data.L; 
    double *G           = data.G; 
    double U;
    for (int ijk =0;ijk<5;ijk++){
        /* here goes some definitions*/
        pthread_mutex_lock(&Mutex_A1);
        for (int k =0;k<Length;k++){
            pthread_mutex_lock(&mutex1); 
            U = G[k];
            pthread_mutex_unlock(&mutex1);
            /*U undergoes a lot of mathematical operations here

            */
        }
        pthread_mutex_lock(&Mutex_B);
        pthread_mutex_unlock(&Mutex_A2);
        for (int k =0;k<Length;k++){        
            /*U another mathematical operations here

            */
            pthread_mutex_lock(&mutex1);
            L[k] = U;
            pthread_mutex_unlock(&mutex1);
            pthread_mutex_unlock(&Mutex_B);
        }
    }
}

现在,当foo1解锁mutexB时,它将不得不等待foo2解锁mutexA1才能工作,foo2只有在已经解锁mutexA2时才能解锁mutexB

这种情况持续了 5 次。

int main(){
    double G1[Length];
    double G2[Length];
    double B1[Length];
    double B2[Length];
    double A2[Length];
    double A1[Length];
    data_pointers data[numThreads];
    data[0].L           = G2;
    data[0].G           = G1;   
    data[0].A           = A1;
    data[0].B           = B1;
    data[0].idxThread   = 0;
    data[1].L           = G1;
    data[1].G           = G2;   
    data[1].A           = A2;
    data[1].B           = B2;
    data[1].idxThread   = 1;
    pthread_mutex_lock(&Mutex_A2);
    pthread_create(&(threads[0]), NULL, thread_func, (void *) &(data[0]));
    pthread_create(&(threads[1]), NULL, thread_func, (void *) &(data[1]));
    pthread_join(threads[1], NULL);
    pthread_join(threads[0], NULL);
    pthread_mutex_unlock(&Mutex_A1);
    pthread_mutex_unlock(&Mutex_A2);
    return 0;
}

注意:这只是一个示例代码。 按预期编译和工作,但没有输出。

最后编辑:谢谢大家的好主意,我有很多经验,并且遵循这些建议很有趣。我将对所有答案进行投票,因为它们很有用,并选择最接近原始问题(原子性(

使用原子指针"锁定"内存中某些位置的示例代码:

#include <vector>
#include <atomic>
#include <thread>
using container = std::vector<std::atomic<double>>;
using container_size_type = container::size_type;
container c(300);
std::atomic<container::pointer> p_busy_elem{ nullptr };
void editor()
{
    for (container_size_type i{ 0 }, sz{ c.size() }; i < sz; ++i)
    {
        p_busy_elem.exchange(&c[i]); // c[i] is busy
        // ... edit c[i] ... // E: calculate a value and assign it to c[i]
        p_busy_elem.exchange(nullptr); // c[i] is no longer busy
    }
}
void reader()
{
    for (container_size_type i{ 0 }, sz{ c.size() }; i < sz; ++i)
    {
        // A1: wait for editor thread to finish editing value
        while (p_busy_elem == &c[i])
        {
            // A2: room a better algorithm to prevent blocking/yielding
            std::this_thread::yield();
        }
        // B: if c[i] is updated in between A and B, this will load the latest value
        auto value = c[i].load();
        // C: c[i] might have changed by this time, but we had the most up to date value we could get without checking again
        // ... use value ...
    }
}
int main()
{
    std::thread t_editor{ editor };
    std::thread t_reader{ reader };
    t_editor.join();
    t_reader.join();
}

在编辑器线程中,设置了繁忙指针以指示当前正在编辑该内存位置 (E(。如果线程 B 在设置繁忙指针后尝试读取该值,它将等到编辑完成后再继续 (A1(。

关于A2的说明:可以在这里放置一个更好的系统。可以保留尝试读取时繁忙的节点列表,然后我们将i添加到该列表中,并在以后尝试处理该列表。好处:可以告诉循环执行continue,并且读取超过当前正在编辑i的索引。

创建要读取的值的副本 (B( 以便根据需要使用它 (C(。这是我们最后一次可以在 c[i] 处检查最新值

如果不调整数组大小,则不需要对单个元素或整个数组进行任何互斥锁。

原子地阅读你的价值观,原子地写下你的价值观并保持冷静。

如果您希望在不使用互斥锁的情况下对类似数组的数据结构进行高性能多线程访问,则可以研究比较和交换。也许您可以设计一种适用于您的特定问题的无锁数据结构。https://en.wikipedia.org/wiki/Compare-and-swap

关于发布的代码,您似乎使事情变得过于复杂。如果要实现:

foo1 (first half)
foo2 (first half) and foo1 (second half) (in parallel)
foo1 (first half) and foo2 (second half) (in parallel)
...
foo2(second half)

两个穆特克斯应该可以。

也许这可以做到。下面是一些伪代码:

// These global variables controls which thread is allowed to
// execute first and second half.
// 1 --> Foo1 may run
// 2 --> Foo2 may run
int accessFirstHalf = 1;
int accessSecondHalf = 1;
void foo1 ( data_pointers &data)
{
    while(YOU_LIKE_TO_GO_ON)
    {
        while (true)
        {
            TAKE_MUTEX_FIRST_HALF;
            if (accessFirstHalf == 1)
            {
                RELEASE_MUTEX_FIRST_HALF;
                break;
            }
            RELEASE_MUTEX_FIRST_HALF;
            pthread_yield();
        }
        // Do the first half
        TAKE_MUTEX_FIRST_HALF;
        // Allow Foo2 to do first half
        accessFirstHalf == 2;
        RELEASE_MUTEX_FIRST_HALF;
        while (true)
        {
            TAKE_MUTEX_SECOND_HALF;
            if (accessSecondHalf == 1)
            {
                RELEASE_MUTEX_SECOND_HALF;
                break;
            }
            RELEASE_MUTEX_SECOND_HALF;
            pthread_yield();
        }
        // Do the second half
        TAKE_MUTEX_SECOND_HALF;
        // Allow Foo2 to do second half
        accessSecondHalf == 2;
        RELEASE_MUTEX_SECOND_HALF;
    }
}

void foo2 ( data_pointers &data)
{
    while(YOU_LIKE_TO_GO_ON)
    {
        while (true)
        {
            TAKE_MUTEX_FIRST_HALF;
            if (accessFirstHalf == 2)
            {
                RELEASE_MUTEX_FIRST_HALF;
                break;
            }
            RELEASE_MUTEX_FIRST_HALF;
            pthread_yield();
        }
        // Do the first half
        TAKE_MUTEX_FIRST_HALF;
        // Allow Foo1 to do first half
        accessFirstHalf == 1;
        RELEASE_MUTEX_FIRST_HALF;
        while (true)
        {
            TAKE_MUTEX_SECOND_HALF;
            if (accessSecondHalf == 2)
            {
                RELEASE_MUTEX_SECOND_HALF;
                break;
            }
            RELEASE_MUTEX_SECOND_HALF;
            pthread_yield();
        }
        // Do the second half
        TAKE_MUTEX_SECOND_HALF;
        // Allow Foo1 to do second half
        accessSecondHalf == 1;
        RELEASE_MUTEX_SECOND_HALF;
    }
}

int main()
{
    // start the threads with foo1 and foo2
}

这似乎是您要求的核心:

foo1 (first half)
foo2 (first half) and foo1 (second half) (in parallel)
foo1 (first half) and foo2 (second half) (in parallel)
...
foo2(second half)

实现与 pthreads 交错的最简单方法是使用屏障。

使用 2 count pthread_barrier_init()初始化屏障。 然后foo1()执行:

first half
pthread_barrier_wait()
second half
pthread_barrier_wait()
...
first half
pthread_barrier_wait()
second half
pthread_barrier_wait()

foo2()执行的序列略有不同:

pthread_barrier_wait()
first half
pthread_barrier_wait()
second half
....
pthread_barrier_wait()
first half
pthread_barrier_wait()
second half