什么是处理长文件行的最佳多线程场景

what is the optimal Multithreading scenario for processing a long file lines?

本文关键字:最佳 多线程 处理 文件 什么      更新时间:2023-10-16

我有一个大文件,我想用多线程读取并处理文件的所有行(偶数行(。

有人建议读取整个文件并将其分解为多个文件(与线程数相同(,然后让每个线程处理一个特定的文件。由于这个想法将读取整个文件,再次写入并读取多个文件,所以它似乎很慢(3倍I/O(,我认为肯定有更好的方案,

我自己认为这可能是一个更好的场景:

一个线程将读取文件并将数据放在全局变量上,其他线程将从该变量中读取数据并进行处理。更详细的:

一个线程将通过运行func1函数读取主文件,并将每个偶数行放在最大大小MAX_BUFFER_SIZE的Buffer:line1Buffer上,其他线程将从Buffer中弹出数据,并通过运行func2函数进行处理。代码:

全局变量:

#define MAX_BUFFER_SIZE 100
vector<string> line1Buffer;
bool continue = true;// to end thread 2 to last thread by setting to false
string file = "reads.fq";

函数func1:(线程1(

void func1(){
 ifstream ifstr(file.c_str());
 for (long long i = 0; i < numberOfReads; i++) { // 2 lines per read
    getline(ifstr,ReadSeq);
    getline(ifstr,ReadSeq);// reading even lines
    while( line1Buffer.size() == MAX_BUFFER_SIZE )
        ; // to delay when the buffer is full
    line1Buffer.push_back(ReadSeq);
 }
 continue = false;
 return;
}

和函数func2:(其他线程(

void func2(){
 string ReadSeq;
 while(continue){
    if(line2Buffer.size() > 0 ){
        ReadSeq  = line1Buffer.pop_back();
        // do the proccessing....
    }
 }
}

关于速度:

如果读取部分较慢,则总时间将等于仅读取一次文件(并且缓冲区每次可能仅包含1个文件,因此仅1个其他线程将能够与线程1一起工作(。如果处理部分较慢,则总时间将等于CCD_ 7线程的整个处理的时间。这两种情况都比用一个线程读取文件并写入多个文件,然后用多线程读取文件和处理。。。

因此有两个问题:

1-如何像线程1运行func1和其他线程运行func2那样通过线程调用函数?

2-还有更快的情况吗?

3-[删除]有人可以将这个想法扩展到M个线程进行读取和N个线程进行处理吗?显然我们知道:M+N==umberOfThreads是真正的

编辑:第三个问题是不对的,因为多个线程无法帮助读取单个文件

感谢所有

其他方法可以是交错线程。阅读是由每一个线程完成的,但一次只有一个线程。由于在第一次迭代中等待线程将被交错。

但如果work()是瓶颈,这只是一个可扩展的选项(那么每次非并行执行都会更好(

线程:

while (!end) {
    // should be fair!
    lock();
    read();
    unlock();
    work();
}

基本示例:(您可能应该添加一些错误处理(

void thread_exec(ifstream* file,std::mutex* mutex,int* global_line_counter) {
    std::string line;
    std::vector<std::string> data;
    int i;
    do {
        i = 0;
        // only 1 concurrent reader
        mutex->lock();
        // try to read the maximum number of lines
        while(i < MAX_NUMBER_OF_LINES_PER_ITERATION && getline(*file,line)) {
            // only the even lines we want to process
            if (*global_line_counter % 2 == 0) {
                data.push_back(line);
                i++;
            }
            (*global_line_counter)++;
        }
        mutex->unlock();
        // execute work for every line
        for (int j=0; j < data.size(); j++) {
            work(data[j]);
        }
        // free old data
        data.clear();
     //until EOF was not reached
   } while(i == MAX_NUMBER_OF_LINES_PER_ITERATION);
}
void process_data(std::string file) {
     // counter for checking if line is even
     int global_line_counter = 0;
     // open file
     ifstream ifstr(file.c_str());
     // mutex for synchronization
     // maybe a fair-lock would be a better solution
     std::mutex mutex;
     // create threads and start them with thread_exec(&ifstr, &mutex, &global_line_counter);
     std::vector<std::thread> threads(NUM_THREADS);
     for (int i=0; i < NUM_THREADS; i++) {
         threads[i] = std::thread(thread_exec, &ifstr, &mutex, &global_line_counter);
     }
     // wait until all threads have finished
     for (int i=0; i < NUM_THREADS; i++) {
         threads[i].join();
     }
}

您的瓶颈是什么?硬盘还是处理时间?

如果是硬盘,那么你可能无法获得更多的性能,因为你已经达到了硬件的极限。并行读取要比试图在文件中跳转快得多。让多个线程尝试读取文件几乎肯定会降低总体速度,因为这会增加磁盘抖动。

一个读取文件的线程和一个处理内容的线程池(或者只有一个其他线程(可能是最好的。

全局变量:

这是个坏习惯。

假设有#p踏板,文章中提到的两种情况以及答案:

1( 使用"a"线程进行读取并使用其他线程进行处理,在这种情况下,#p-1线程将处理,而不是仅处理一个线程读取。假设完整操作的时间是jobTime,处理n个线程的时间是pTime(n(,所以:

最坏的情况发生在读取时间比处理和jobTime = pTime(1)+readTime慢得多的时候,而最好的情况是处理比读取慢的时候,其中jobTime等于pTime(#p-1)+readTime

2( 使用所有CCD_ 17线程进行读取和处理。在这个场景中,每个线程都需要执行两个步骤。第一步是读取大小为CCD_ 18的文件的一部分,该部分是顺序的;意味着没有两个线程可以同时读取。但第二部分是对读取的数据进行并行处理。这样,在最坏的情况下,jobTime和以前一样是pTime(1)+readTime(但是*(,但最佳优化的情况是pTime(#p(+readTime,这比以前更好。

*:然而,在第二种方法的最坏情况下,读取速度较慢,但您可以找到一个优化的MAX_BUFFER_SIZE,其中(在最坏的情况下(一个线程的某些读取将与另一线程的某些处理重叠。使用这种优化的MAX_BUFFER_SIZEjobTime将小于pTime(1)+readTime,并且可能发散到readTime

首先,读取文件是一项缓慢的操作,因此除非您正在进行一些繁重的处理,否则文件读取将受到限制。

如果您决定采用多线程路由,那么队列是正确的方法。只要确保你在前面推一个弹出的后面。一个stl::deque应该工作得很好。此外,您还需要用互斥锁锁定队列,并用条件变量同步它。

最后一件事是,如果我们推的速度比弹出的速度快,则需要限制队列的大小。