如何在c++多线程中匹配处理时间和接收时间

How to match processing time with reception time in c++ multithreading

本文关键字:时间 处理 c++ 多线程      更新时间:2023-10-16

我正在编写一个c++应用程序,其中每0.5秒将接收4096字节的数据。这将被处理,输出将被发送到其他应用程序。处理每组数据需要将近2秒的时间。我就是这样做的。在我的主函数中,我接收数据并将其推送到向量中。我创建了一个线程,它将始终处理第一个元素,并在处理后立即删除它。下面是我的应用程序接收部分的模拟。

#include<iostream>
#include <unistd.h>
#include <vector>
#include <mutex>
#include <pthread.h>
using namespace std;
struct Student{
int id;
int age;
};
vector<Student> dustBin;
pthread_mutex_t lock1;
bool isEven=true;
void *processData(void* arg){
Student st1;
while(true)
{
if(dustBin.size())
{       
printf("front: %dtSize: %dn",dustBin.front(),dustBin.size());
st1 = dustBin.front();
cout << "Currently Processing ID "<<st1.id<<endl;
sleep(2);
pthread_mutex_lock(&lock1);
dustBin.erase(dustBin.begin());
cout<<"Deleted"<<endl;
pthread_mutex_unlock(&lock1);
}
}
return NULL;
}
int main()
{
pthread_t ptid;
Student st;
dustBin.clear();
pthread_mutex_init(&lock1, NULL);
pthread_create(&ptid, NULL, &processData, NULL);
while(true)
{
for(int i=0; i<4096; i++)
{
st.id = i+1;
st.age = i+2;
pthread_mutex_lock(&lock1);
dustBin.push_back(st);
printf("Pushed: %dn",st.id);
pthread_mutex_unlock(&lock1);
usleep(500000);
}
}
pthread_join(ptid, NULL);
pthread_mutex_destroy(&lock1);
}

此代码的输出为输出

在这里发布的输出图像中,您可以观察到处理的确切顺序。它每4次插入只处理一个项目。

Note that the reception time of data <<< processing time.

由于这个原因,我的输入缓冲区增长非常快。还有一件事是,由于主线程和processData线程使用互斥锁,它们相互依赖以释放锁。由于这个原因,我的传入缓冲区被锁定,有时会导致数据丢失。请,有人,建议我如何处理这件事,或者建议我一些方法。

谢谢&问候

Vamsi

未定义的行为

读取数据时,必须先锁定,然后才能获得size

忙等待

你应该总是避免无关痛痒。在这里,如果dustBin是空的,你将立即永远检查它,它将使用100%的核心,并降低其他一切的速度,耗尽笔记本电脑的电池,使其温度超过应有的温度。写这样的代码是个坏主意

首先学习多线程

你应该读一本或两本关于多线程的书。如果不花时间好好学习,正确地执行多线程是很困难的,而且几乎是不可能的C++操作中的并发性强烈建议用于标准C++多线程。

条件变量

通常,您会使用一个条件变量或某种事件来告诉使用者线程何时添加数据,这样它就不必无用地醒来来检查是否是这样。

由于你有一个典型的生产者/消费者,你应该能够找到很多关于如何做到这一点的信息,或者特殊的容器或其他有助于实现你的代码的构造。

输出

您的printfcout内容将对性能产生影响,并且由于有些内容在锁内,而另一些内容则不在,因此您可能会得到格式不正确的输出。如果您确实需要输出,第三个线程可能是更好的选择。在任何情况下,您都希望最大限度地减少锁定时间,因此格式化为临时缓冲区可能也是一个好主意。

顺便说一句,标准输出相对较慢,这很可能是您无法快速处理所有数据的原因。

处理率

显然,如果您能够每0.5秒生成4096字节的数据,但需要2秒来处理这些数据,那么您就遇到了严重的问题。

在这里提问之前,你应该认真考虑一下在这种情况下你想做什么,因为如果没有这些信息,我们正在猜测可能的解决方案。

以下是一些可能性:

  • 放慢制片人的速度。显然,如果您实时获取数据,这是不起作用的
  • 优化消费者(更好的算法、更好的硬件、最佳并行性…)
  • 跳过一些数据

显然,对于性能问题,您应该使用探查器来了解您是否浪费了时间。一旦你知道了这一点,你就会更好地知道在哪里检查以改进你的代码。

花2秒钟处理数据确实很慢,但我们无法帮助您,因为我们不知道您的代码在做什么。

例如,如果将数据添加到数据库中,但无法进行后续操作,则可能需要将多个插入批处理到单个命令中,以减少通过网络与数据库通信的开销。

另一个例子是,如果您将数据附加到一个文件中,您可能希望在每次写入之前保持文件打开并积累一些数据。

容器

如果你一个接一个地从头部移除项目,并且它的大小变得有些大(比如说超过100个小项目),因为每次都需要移动其他项目,那么向量不是一个好的选择。

除了按照注释中的建议更改容器之外,另一种可能性是使用2个向量并交换它们。这样,您将能够减少锁定互斥对象的时间,并在不需要锁定的情况下处理许多项目。

如何优化

你应该积累足够的数据(比如30秒),停止积累,然后用这些数据测试你的处理速度。如果你不能在不到一半的时间(15秒)内处理这些数据,那么你显然需要以某种方式提高处理速度。如果你的消费者足够快,那么你就可以优化从生产者到消费者的沟通。

您必须知道您的瓶颈是I/O、数据库还是什么,以及某些部分是否可以并行完成。

在你没有展示的代码中可能有很多优化可以完成。。。

如果不能足够快地处理消息,则必须丢弃一些消息。

使用固定大小的圆形缓冲区。

然后,如果提供者比使用者快,则旧的条目将被覆盖。

如果你不能跳过一些数据,也不能足够快地处理它,你就注定要失败。

创建两个const变量,NBUFFERS和NTHREADS,如果您有16个核心,并且处理速度慢4倍,则最初将它们都设为8。稍后使用这些值。

创建NBUFFERS数据缓冲区,每个缓冲区足够大,可以容纳4096个样本。在实践中,只需创建一个大缓冲区,并对其进行偏移即可将其分割。

启动NTHREADS。他们每个人都会不断地等待被告知要处理哪个缓冲区,然后他们会处理它并再次等待另一个缓冲区。

在主程序中,进入一个循环,接收数据。将前4096个样本接收到第一缓冲区中,并通知第一线程。将第二4096个样本接收到第二缓冲区中,并通知第二线程。

buffer = (buffer + 1) % NBUFFERS
thread = (thread + 1) % NTHREADS

冲洗并重复。由于有8个线程,数据每0.5秒才到达一次,因此每个线程每4秒只会获得一个新的缓冲区,但只需要2秒就可以清除以前的缓冲区。