在Qt线程之间发送大量数据

Sending large amount of data between Qt threads

本文关键字:数据 Qt 线程 之间      更新时间:2023-10-16

我有一个QThread,它定期生成相当大量的数据(每秒几兆字节),它需要将其传输到父(GUI)线程。

恐怕我对QThread的内部运作不太确定,所以我想要求一个最佳实践。

显然,传输数据最直接的方式就是emit数组。但是,这有多有效?Qt是否知道它的使用位置,并在发送和接收时避免深度复制它?

如果没有,我很乐意在主线程中分配内存,并给出一个指向子线程的指针,它将在其中写入数据(并且仅emit有关进度的短消息)。这对我来说似乎不是最优雅的解决方案,这就是我问的原因。

如果Qt在发射和接收时避免在多个缓冲区中复制数据,那么在所有系统中都能保证吗?我没有资源尝试在各种操作系统下对其进行基准测试。

QThread的内部工作原理是无关紧要的:它们在事件循环如何工作中没有任何作用。当您在与槽对象的线程不同的线程中的QObjectemit信号时,该信号将作为QMetaCallEvent发布到接收线程的事件队列中。 然后,在接收线程中运行的事件循环将作用于此事件,并对连接到发射信号的插槽执行调用。

因此,无论发生什么,您通过信号发送的任何数据最终都会成为 QEvent 派生类实例中的有效负载。

问题的实质是当QMetaCallEvent到达事件循环并且容器作为参数传递到插槽中时。当然,在此过程中可以多次调用复制构造函数。下面是一些简单的代码,演示了复制构造函数和默认构造函数实际上被调用了多少次

  • 在隐式共享写入时复制容器 (QVector) 的数据成员的元素上,

  • 在代表容器的自定义类上。

你会惊喜:)

由于Qt容器是隐式共享的写入时复制,因此它们的复制构造成本可以忽略不计:所做的只是在构造时原子地递增参考计数器。例如,不会复制任何数据成员。

唉,11 之前的C++显示了它丑陋的一面:如果插槽代码以任何方式修改容器,则无法以让编译器知道不再需要原始容器的方式传递对插槽的引用。因此:如果槽收到对容器的 const 引用,则可以保证不会创建任何副本。如果槽收到容器的可写副本对其进行修改,则会创建一个完全不必要的副本,因为不再需要调用站点上活动的实例。在 C++-11 中,您将右值引用作为参数传递。在函数调用中传递右值引用将结束调用器中传递对象的生存期。

示例代码输出:

"Started" copies: 0 assignments: 0 default instances: 0 
"Created Foo" copies: 0 assignments: 0 default instances: 100 
"Created Bar" copies: 0 assignments: 0 default instances: 100 
"Received signal w/const container" copies: 0 assignments: 0 default instances: 100 
"Received signal w/copy of the container" copies: 0 assignments: 0 default instances: 100 
"Made a copy" copies: 100 assignments: 1 default instances: 101 
"Reset" copies: 0 assignments: 0 default instances: 0 
"Received signal w/const class" copies: 2 assignments: 0 default instances: 1 
"Received signal w/copy of the class" copies: 3 assignments: 0 default instances: 1 
//main.cpp
#include <QtCore>
class Class {
    static QAtomicInt m_copies;
    static QAtomicInt m_assignments;
    static QAtomicInt m_instances;
public:
    Class() { m_instances.fetchAndAddOrdered(1); }
    Class(const Class &) { m_copies.fetchAndAddOrdered(1); }
    Class & operator=(const Class &) { m_assignments.fetchAndAddOrdered(1); return *this; }
    static void dump(const QString & s = QString()) {
        qDebug() << s << "copies:" << m_copies << "assignments:" << m_assignments << "default instances:" << m_instances;
    }
    static void reset() {
        m_copies = 0;
        m_assignments = 0;
        m_instances = 0;
    }
};
QAtomicInt Class::m_instances;
QAtomicInt Class::m_copies;
QAtomicInt Class::m_assignments;
typedef QVector<Class> Vector;
Q_DECLARE_METATYPE(Vector)
class Foo : public QObject
{
    Q_OBJECT
    Vector v;
public:
    Foo() : v(100) {}
signals:
    void containerSignal(const Vector &);
    void classSignal(const Class &);
public slots:
    void sendContainer() { emit containerSignal(v); }
    void sendClass() { emit classSignal(Class()); }
};
class Bar : public QObject
{
    Q_OBJECT
public:
    Bar() {}
signals:
    void containerDone();
    void classDone();
public slots:
    void containerSlotConst(const Vector &) {
        Class::dump("Received signal w/const container");
    }
    void containerSlot(Vector v) {
        Class::dump("Received signal w/copy of the container");
        v[99] = Class();
        Class::dump("Made a copy");
        Class::reset();
        Class::dump("Reset");
        emit containerDone();
    }
    void classSlotConst(const Class &) {
        Class::dump("Received signal w/const class");
    }
    void classSlot(Class) {
        Class::dump("Received signal w/copy of the class");
        emit classDone();
        //QThread::currentThread()->quit();
    }
};
int main(int argc, char ** argv)
{
    QCoreApplication a(argc, argv);
    qRegisterMetaType<Vector>("Vector");
    qRegisterMetaType<Class>("Class");
    Class::dump("Started");
    QThread thread;
    Foo foo;
    Bar bar;
    Class::dump("Created Foo");
    bar.moveToThread(&thread);
    Class::dump("Created Bar");
    QObject::connect(&thread, SIGNAL(started()), &foo, SLOT(sendContainer()));
    QObject::connect(&foo, SIGNAL(containerSignal(Vector)), &bar, SLOT(containerSlotConst(Vector)));
    QObject::connect(&foo, SIGNAL(containerSignal(Vector)), &bar, SLOT(containerSlot(Vector)));
    QObject::connect(&bar, SIGNAL(containerDone()), &foo, SLOT(sendClass()));
    QObject::connect(&foo, SIGNAL(classSignal(Class)), &bar, SLOT(classSlotConst(Class)));
    QObject::connect(&foo, SIGNAL(classSignal(Class)), &bar, SLOT(classSlot(Class)));
    QObject::connect(&bar, SIGNAL(classDone()), &thread, SLOT(quit()));
    QObject::connect(&thread, SIGNAL(finished()), &a, SLOT(quit()));
    thread.start();
    a.exec();
    thread.wait();
}
#include "main.moc"

当通信大型缓冲区时,生产者线程中的 new() 缓冲区对象是"传统的",加载时,将 *buffer 排队/发出/任何内容到消费者线程,并立即 new() 另一个(进入相同的 *buffer var),用于下一次数据加载。

问题:如果您的 GUI 线程无法跟上,除非您采取一些流量控制措施(例如,预先分配 *缓冲区池并"循环"它们),否则您将获得内存失控。

我通常做的是在循环中预先分配一些缓冲区实例(在大型服务器中多达数千个),并将它们的实例推送到生产者-消费者的"池队列"上。 如果子线程想要将数据从某个网络连接加载到缓冲区中,则必须从池中弹出一个并加载它。 然后,它可以将缓冲区排队/发出/任何缓冲区到使用者线程,并为可能传入的更多数据弹出另一个缓冲区。 使用者线程获取 bufer,处理数据并将"已使用"缓冲区推送回池队列以供重用。这提供了流控制:如果子线程加载缓冲区的速度快于使用者线程处理它们的速度,它将发现池为空并阻止它,直到使用者线程返回一些使用的缓冲区,从而限制缓冲区/内存的使用(并且还避免了连续的new/dispose,或那些支持它的语言中的GC)。

我喜欢将池队列计数转储到 1 秒计时器上的 GUI 状态栏 - 这允许我观察缓冲区使用情况,(并快速发现是否有任何泄漏:)。