实现类似于Qt的高性能互斥锁

Implement a high performance mutex similar to Qt's one

本文关键字:高性能 类似于 Qt 实现      更新时间:2023-10-16

我有一个多线程科学应用程序,其中几个计算线程(每个核心一个)必须将其结果存储在一个公共缓冲区中。这需要一个互斥机制。

工作线程只花一小部分时间写入缓冲区,因此互斥锁大部分时间都是解锁的,而且锁很有可能立即成功,而无需等待另一个线程解锁。

目前,我已经将Qt的QMutex用于该任务,并且它运行良好:互斥的开销可以忽略不计。

然而,我不得不将它移植到c++11/STL。当使用std::互斥锁时,性能下降了66%,线程将大部分时间花在锁定互斥锁上。

在另一个问题之后,我认为Qt使用了一种基于简单原子标志的快速锁定机制,该机制针对互斥对象尚未锁定的情况进行了优化。当并发锁定发生时,返回到系统互斥。

我想在STL中实现这一点。有没有一种基于std::atomic和std::mutex的简单方法?我已经深入研究了Qt的代码,但它对我的使用来说似乎过于复杂(我不需要锁超时、皮条、小占地面积等…)

编辑:我尝试过旋转锁,但效果不好,因为:

另一个线程会定期(每隔几秒钟)锁定互斥对象并刷新缓冲区。这需要一些时间,所以此时所有工作线程都会被阻塞。自旋锁使调度繁忙,导致刷新速度比使用适当的互斥对象慢10-100倍。这是不可接受的

编辑:我已经尝试过了,但它不起作用(锁定所有线程)

class Mutex
{
public:
    Mutex() : lockCounter(0) { }
    void lock()
    {
        if(lockCounter.fetch_add(1, std::memory_order_acquire)>0)
        {
            std::unique_lock<std::mutex> lock(internalMutex);
            cv.wait(lock);
        }
    }
    void unlock();
    {
        if(lockCounter.fetch_sub(1, std::memory_order_release)>1)
        {
            cv.notify_one();
        }
    }

private:
    std::atomic<int> lockCounter;
    std::mutex internalMutex;
    std::condition_variable cv;
};

谢谢!

编辑:最终解决方案

MikeMB的快速互斥锁运行得很好。

作为最后的解决方案,我做了:

  • 使用带有try_lock的简单自旋锁
  • 当线程try_lock失败时,它们不会等待,而是填充一个队列(不与其他线程共享)并继续
  • 当线程获得锁时,它会使用当前结果更新缓冲区,但也会使用存储在队列中的结果(它处理自己的队列)
  • 缓冲区刷新的效率要高得多:阻塞部分只交换两个指针

一般建议

正如在一些评论中提到的,我首先要看一下,是否可以重组程序设计,使互斥实现对性能不那么重要
此外,由于标准c++中的多线程支持非常新,而且有点幼稚,因此有时您只需要依靠特定于平台的机制,例如linux系统上的futex或windows上的关键部分或Qt等非标准库
话虽如此,我可以想到两种可能加速您的程序的实现方法:

旋转锁
如果访问冲突很少发生,并且互斥锁只保持很短的时间(当然,有两件事应该努力实现),那么只使用旋转锁可能是最有效的,因为它根本不需要任何系统调用,而且实现起来很简单(取自cpprreference):

class SpinLock {
    std::atomic_flag locked ;
public:
    void lock() {
        while (locked.test_and_set(std::memory_order_acquire)) { 
             std::this_thread::yield(); //<- this is not in the source but might improve performance. 
        }
    }
    void unlock() {
        locked.clear(std::memory_order_release);
    }
};

当然,缺点是等待线程不会保持睡眠状态,从而占用处理时间。

已检查锁定

这基本上就是您演示的想法:首先根据原子交换操作快速检查是否确实需要锁定,并仅在不可避免的情况下使用重std::mutex

struct FastMux {
    //Status of the fast mutex
    std::atomic<bool> locked;
    //helper mutex and vc on which threads can wait in case of collision
    std::mutex mux;
    std::condition_variable cv;
    //the maximum number of threads that might be waiting on the cv (conservative estimation)
    std::atomic<int> cntr; 
    FastMux():locked(false), cntr(0){}
    void lock() {
        if (locked.exchange(true)) {
            cntr++;
            {
                std::unique_lock<std::mutex> ul(mux);
                cv.wait(ul, [&]{return !locked.exchange(true); });
            }
            cntr--;
        }
    }
    void unlock() {
        locked = false;
        if (cntr > 0){
            std::lock_guard<std::mutex> ul(mux);
            cv.notify_one();
        }
    }
};

注意,std::mutex没有锁定在lock()unlock()之间,而是仅用于处理条件变量。如果互斥体上存在高拥塞,这将导致更多的锁定/解锁调用。

您的实现的问题是,cv.notify_one();可能在if(lockCounter.fetch_add(1, std::memory_order_acquire)>0)cv.wait(lock);之间被调用,因此您的线程可能永远不会醒来。

不过,我没有与您提议的实现的固定版本进行任何性能比较,所以您只需要看看什么最适合您。

并不是每个定义的答案,但根据具体任务的不同,无锁队列可能有助于消除互斥。如果你有多个生产者和一个消费者(甚至多个消费者),这将有助于设计。链接:

  • Boost.Lockfree提供了这样一个队列,虽然不是直接的C++/STL
  • 另一个选项是Anthony Williams的"C++并发操作"中的无锁队列实现
  • C的一个快速免锁队列++

更新参考注释:

队列大小/溢出:

  • 通过i)使队列足够大,或者ii)使生产者线程在队列满后等待推送数据,可以避免队列溢出
  • 另一种选择是使用多个消费者和多个队列并实现并行缩减,但这取决于如何处理数据

消费者线索:

  • 队列可以使用std::condition_variable,并使使用者线程等待,直到有数据为止
  • 另一种选择是使用计时器定期检查队列是否为非空(轮询),一旦队列为非空,线程就可以连续获取数据并返回等待模式