如果我的CPU负载表明不是这样,我应该启动多个线程吗

should I launch multiple threads if my CPU load suggests otherwise?

本文关键字:启动 我应该 线程 负载 CPU 我的 如果      更新时间:2023-10-16

我正在C++中制作一个程序,该程序根据引用注释计算NGS读取对齐。基本上,程序将注释和对齐文件读入内存,遍历注释,二进制搜索对齐文件中的可能位置,找到该位置后,线性搜索该可能位置周围的帧。

通常,我想让这个框架保持较大(10000条对齐),所以我有了将框架拆分并将其部分放入单独的线程的想法。

一切都在编译和运行,但看起来我的多线程并没有按预期工作,因为我的comp使用了一个核心来完成任务。有谁能帮我弄清楚我在哪里实现了线程错误吗。

https://sourceforge.net/projects/fast-count/?source=directory

#include <iostream>
#include <cstdlib>
#include <vector>
#include <string>
#include <thread>
#include <sstream>
#include <fstream>
#include <math.h> 
#include "api/BamReader.h"
using namespace std;
using namespace BamTools;
int hit_count = 0;
struct bam_headers{
    string chr;
    int start;
};
struct thread_data{
   int thread_id;
   int total_thread;
   int start_gtf;
   int stop_gtf;
};
struct gtf_headers{
    string chr;
    string source;
    string feature;
    string score;
    string strand;
    string frame;
    string annotation;
    int start;
    int end;
};
void process(int* start_holder, int size, int gtf_start, int gtf_stop){
    //threaded counter process
    for (int t = 0; t < size; t++){
        if((start_holder[t] >= gtf_start) && (start_holder[t] <= gtf_stop)){
            hit_count++;
        }
    }
}
vector <string> find_index(vector <vector <bam_headers> > bams){
    //define vector for bam_index to chromosome
    vector <string> compute_holder;
    for (int bam_idx = 0; bam_idx < bams.size();bam_idx++){
        compute_holder.push_back(bams[bam_idx][0].chr);
    }
    return compute_holder;
}
vector <gtf_headers> load_gtf(char* filename){
    //define matrix to memory holding gtf annotations by assoc. header
    vector<gtf_headers> push_matrix;
    gtf_headers holder;
    ifstream gtf_file(filename);
    string line;
    cout << "Loading GTF to memory" << "n";
    if (gtf_file.is_open()){
        int sub_count = 0;
        string transfer_hold[8];
        while(getline(gtf_file,line)){
            //iterate through file
            istringstream iss(line);
            string token;
            //iterate through line, and tokenize by tab delimitor
            while(getline(iss,token,'t')){
                if (sub_count == 8){
                    //assign to hold struct, and push to vector
                    holder.chr = transfer_hold[0];
                    holder.source = transfer_hold[1];
                    holder.feature = transfer_hold[2];
                    holder.start = atoi(transfer_hold[3].c_str());
                    holder.end = atoi(transfer_hold[4].c_str());
                    holder.score = transfer_hold[5];
                    holder.strand = transfer_hold[6];
                    holder.frame = transfer_hold[7];
                    holder.annotation = token;
                    push_matrix.push_back(holder);
                    sub_count = 0;
                } else {
                    //temporarily hold tokens
                    transfer_hold[sub_count] = token;
                    ++sub_count;
                }
            }
        }
        cout << "GTF successfully loaded to memory" << "n";
        gtf_file.close();
        return(push_matrix);
    }else{
        cout << "GTF unsuccessfully loaded to memory. Check path to file, and annotation format. Exiting" << "n";
        exit(-1);
    }
}
vector <vector <bam_headers>> load_bam(char* filename){
    //parse individual bam file to chromosome bins
    vector <vector <bam_headers> > push_matrix;
    vector <bam_headers> iter_chr;
    int iter_refid = -1;
    bam_headers bam_holder;
    BamReader reader;
    BamAlignment al;
    const vector<RefData>& references = reader.GetReferenceData();
    cout << "Loading " << filename << " to memory" << "n";
    if (reader.Open(filename)) {    
        while (reader.GetNextAlignmentCore(al)) {
            if (al.IsMapped()){
                //bam file must be sorted by chr. otherwise the lookup will segfault
                if(al.RefID != iter_refid){
                    //check if chr. position has advanced in the bam file, if true, push empty vector
                    iter_refid++;
                    push_matrix.push_back(iter_chr);
                }else{
                    //if chr. position hasn't advanced push to current index in 2d vector
                    bam_holder.chr = references[al.RefID].RefName;
                    bam_holder.start = al.Position;
                    push_matrix.at(iter_refid).push_back(bam_holder);
                }
            }
        }
        reader.Close();
        cout << "Successfully loaded " << filename << " to memory" << "n";
        return(push_matrix);
    }else{
        cout << "Could not open input BAM file. Exiting." << endl;
        exit(-1);
    }
}
short int find_bin(const string & gtf_chr, const vector <string> mapping){
    //determines which chr. bin the gtf line is associated with 
    int bin_compare = -1;
    for (int i = 0; i < mapping.size(); i++){
        if(gtf_chr == mapping[i]){ 
            bin_compare = i;
        }
    }
    return(bin_compare);
}
int find_frame(gtf_headers gtf_matrix, vector <bam_headers> bam_file_bin){
    //binary search to find alignment index with greater and less than gtf position
    int bin_size = bam_file_bin.size();
    int high_end = bin_size;
    int low_end = 0;
    int binary_i = bin_size / 2;
    int repeat = 0;
    int frame_start;
    bool found = false;
    while (found != true){
        if ((bam_file_bin[binary_i].start >= gtf_matrix.start) && (bam_file_bin[binary_i].start <= gtf_matrix.end)){
            frame_start = binary_i;
            found = true;
        }else{
            if(repeat != binary_i){
                if(bam_file_bin[binary_i].start > gtf_matrix.end){
                    if(repeat != binary_i){
                        repeat = binary_i;
                        high_end = binary_i;
                        binary_i = ((high_end - low_end) / 2) + low_end;
                    }
                }else{
                    if(repeat != binary_i){
                        repeat = binary_i;
                        low_end = binary_i;
                        binary_i = ((high_end - low_end) / 2) + low_end;
                    }
                }
            }else{
                frame_start = low_end; 
                found = true;
            }
        }   
    }
    return(frame_start);
}
vector <int > define_frame(int frame_size, int frame_start, int bam_matrix){
    //define the frame for the search
    vector <int> push_ints;
    push_ints.push_back(frame_start - (frame_size / 2)); 
    push_ints.push_back(frame_start + (frame_size / 2)); 
    if(push_ints[0] < 0){
        push_ints[0] = 0;
        push_ints[1] = frame_size;
        if(push_ints[1] > bam_matrix){
            push_ints[1] = frame_size;
        }
    } 
    if(push_ints[1] > bam_matrix){
        push_ints[1] = bam_matrix;
        push_ints[0] = bam_matrix - (frame_size / 2);
        if(push_ints[0] < 0){
            push_ints[0] = 0;
        }
    }
    return(push_ints);
}
void thread_handler(int nthread, vector <int> frame, vector <bam_headers> bam_matrix, gtf_headers gtf_record){
    int thread_divide = frame[1]-frame[0];//frame_size / nthread;
    int thread_remain = (frame[1]-frame[0]) % nthread;
    int* start_holder = new int[thread_divide];
    for(int i = 0; i < nthread; i++){
        if (i < nthread - 1){
            for (int frame_index = 0; frame_index < thread_divide; frame_index++){
                 start_holder[frame_index] = bam_matrix[frame[0]+frame_index].start;         
            } 
            frame[0] = frame[0] + thread_divide;
            thread first(process, start_holder,thread_divide,gtf_record.start,gtf_record.end);
            first.join();
        }else{
            for (int frame_index = 0; frame_index < thread_divide + thread_remain; frame_index++){
                 start_holder[frame_index] = bam_matrix[frame[0]+frame_index].start;    
            } 
            thread last(process, start_holder,thread_divide + thread_remain,gtf_record.start,gtf_record.end);
            last.join();
        }
    }
}

int main (int argc, char *argv[])
{
    // usage
    // ./count threads frame_size gtf_file files
    //define matrix to memory holding gtf annotations by assoc. header
    vector <gtf_headers> gtf_matrix = load_gtf(argv[3]);
    //load bam, perform counts
    for(int i = 4;i < argc;i++){
        //iterate through filenames in argv, define matrix to memory holding bam alignments chr and bp position
        vector <vector <bam_headers> > bam_matrix = load_bam(argv[i]);
        //map chromosome to bam matrix index
        vector <string> index_mapping = find_index(bam_matrix);
        //iterate through gtf matrix, find corresponding bins for chr, set search frames, and count
        for(int gtf_i = 0; gtf_i < gtf_i < gtf_matrix.size();gtf_i++){ //gtf_i < gtf_matrix.size()
            hit_count = 0;
            //find corresponding bins for gtf chr
            short int bin_compare = find_bin(gtf_matrix[gtf_i].chr,index_mapping);
            if(bin_compare != -1){
                //find start of search frame
                int frame_start = find_frame(gtf_matrix[gtf_i], bam_matrix[bin_compare]);
                //get up lower bounds of search frame;
                vector <int> full_frame = define_frame(atoi(argv[2]),frame_start,bam_matrix[bin_compare].size());
                //create c array of bam positional data for the frame, and post to thread process
                thread_handler(atoi(argv[1]),full_frame,bam_matrix[bin_compare],gtf_matrix[gtf_i]);
            }
            //counts displayed in STOUT
            cout << gtf_matrix[gtf_i].chr << "t" << gtf_matrix[gtf_i].source << "t" << gtf_matrix[gtf_i].feature << "t" << gtf_matrix[gtf_i].start << "t" << gtf_matrix[gtf_i].end << "t" << gtf_matrix[gtf_i].score << "t" << gtf_matrix[gtf_i].strand << "t" << gtf_matrix[gtf_i].frame << "t" << gtf_matrix[gtf_i].annotation << "t" << hit_count << "n";
        }
    }
}

问题的答案很简单:

thread last(process, start_holder,thread_divide + thread_remain,gtf_record.start,gtf_record.end);
last.join();

在这里,父任务创建一个新线程,并且。。。立即等待线程完成。这就是join()的作用,它等待线程终止。

因此,您的代码启动一个新线程,并立即等待它完成,然后再执行其他操作,比如启动下一个线程。

您需要重写thread_handler()来实例化所有std::thread实例,然后在实例化所有实例后,对每个实例调用join(),等待所有实例完成。

典型的方法是使用std::thread的默认构造函数预创建所有线程实例的std::vector,然后对它们进行循环以初始化每个实例,然后再次对它们进行环路,对每个实例调用join()。