如果我的CPU负载表明不是这样,我应该启动多个线程吗
should I launch multiple threads if my CPU load suggests otherwise?
我正在C++中制作一个程序,该程序根据引用注释计算NGS读取对齐。基本上,程序将注释和对齐文件读入内存,遍历注释,二进制搜索对齐文件中的可能位置,找到该位置后,线性搜索该可能位置周围的帧。
通常,我想让这个框架保持较大(10000条对齐),所以我有了将框架拆分并将其部分放入单独的线程的想法。
一切都在编译和运行,但看起来我的多线程并没有按预期工作,因为我的comp使用了一个核心来完成任务。有谁能帮我弄清楚我在哪里实现了线程错误吗。
https://sourceforge.net/projects/fast-count/?source=directory
#include <iostream>
#include <cstdlib>
#include <vector>
#include <string>
#include <thread>
#include <sstream>
#include <fstream>
#include <math.h>
#include "api/BamReader.h"
using namespace std;
using namespace BamTools;
int hit_count = 0;
struct bam_headers{
string chr;
int start;
};
struct thread_data{
int thread_id;
int total_thread;
int start_gtf;
int stop_gtf;
};
struct gtf_headers{
string chr;
string source;
string feature;
string score;
string strand;
string frame;
string annotation;
int start;
int end;
};
void process(int* start_holder, int size, int gtf_start, int gtf_stop){
//threaded counter process
for (int t = 0; t < size; t++){
if((start_holder[t] >= gtf_start) && (start_holder[t] <= gtf_stop)){
hit_count++;
}
}
}
vector <string> find_index(vector <vector <bam_headers> > bams){
//define vector for bam_index to chromosome
vector <string> compute_holder;
for (int bam_idx = 0; bam_idx < bams.size();bam_idx++){
compute_holder.push_back(bams[bam_idx][0].chr);
}
return compute_holder;
}
vector <gtf_headers> load_gtf(char* filename){
//define matrix to memory holding gtf annotations by assoc. header
vector<gtf_headers> push_matrix;
gtf_headers holder;
ifstream gtf_file(filename);
string line;
cout << "Loading GTF to memory" << "n";
if (gtf_file.is_open()){
int sub_count = 0;
string transfer_hold[8];
while(getline(gtf_file,line)){
//iterate through file
istringstream iss(line);
string token;
//iterate through line, and tokenize by tab delimitor
while(getline(iss,token,'t')){
if (sub_count == 8){
//assign to hold struct, and push to vector
holder.chr = transfer_hold[0];
holder.source = transfer_hold[1];
holder.feature = transfer_hold[2];
holder.start = atoi(transfer_hold[3].c_str());
holder.end = atoi(transfer_hold[4].c_str());
holder.score = transfer_hold[5];
holder.strand = transfer_hold[6];
holder.frame = transfer_hold[7];
holder.annotation = token;
push_matrix.push_back(holder);
sub_count = 0;
} else {
//temporarily hold tokens
transfer_hold[sub_count] = token;
++sub_count;
}
}
}
cout << "GTF successfully loaded to memory" << "n";
gtf_file.close();
return(push_matrix);
}else{
cout << "GTF unsuccessfully loaded to memory. Check path to file, and annotation format. Exiting" << "n";
exit(-1);
}
}
vector <vector <bam_headers>> load_bam(char* filename){
//parse individual bam file to chromosome bins
vector <vector <bam_headers> > push_matrix;
vector <bam_headers> iter_chr;
int iter_refid = -1;
bam_headers bam_holder;
BamReader reader;
BamAlignment al;
const vector<RefData>& references = reader.GetReferenceData();
cout << "Loading " << filename << " to memory" << "n";
if (reader.Open(filename)) {
while (reader.GetNextAlignmentCore(al)) {
if (al.IsMapped()){
//bam file must be sorted by chr. otherwise the lookup will segfault
if(al.RefID != iter_refid){
//check if chr. position has advanced in the bam file, if true, push empty vector
iter_refid++;
push_matrix.push_back(iter_chr);
}else{
//if chr. position hasn't advanced push to current index in 2d vector
bam_holder.chr = references[al.RefID].RefName;
bam_holder.start = al.Position;
push_matrix.at(iter_refid).push_back(bam_holder);
}
}
}
reader.Close();
cout << "Successfully loaded " << filename << " to memory" << "n";
return(push_matrix);
}else{
cout << "Could not open input BAM file. Exiting." << endl;
exit(-1);
}
}
short int find_bin(const string & gtf_chr, const vector <string> mapping){
//determines which chr. bin the gtf line is associated with
int bin_compare = -1;
for (int i = 0; i < mapping.size(); i++){
if(gtf_chr == mapping[i]){
bin_compare = i;
}
}
return(bin_compare);
}
int find_frame(gtf_headers gtf_matrix, vector <bam_headers> bam_file_bin){
//binary search to find alignment index with greater and less than gtf position
int bin_size = bam_file_bin.size();
int high_end = bin_size;
int low_end = 0;
int binary_i = bin_size / 2;
int repeat = 0;
int frame_start;
bool found = false;
while (found != true){
if ((bam_file_bin[binary_i].start >= gtf_matrix.start) && (bam_file_bin[binary_i].start <= gtf_matrix.end)){
frame_start = binary_i;
found = true;
}else{
if(repeat != binary_i){
if(bam_file_bin[binary_i].start > gtf_matrix.end){
if(repeat != binary_i){
repeat = binary_i;
high_end = binary_i;
binary_i = ((high_end - low_end) / 2) + low_end;
}
}else{
if(repeat != binary_i){
repeat = binary_i;
low_end = binary_i;
binary_i = ((high_end - low_end) / 2) + low_end;
}
}
}else{
frame_start = low_end;
found = true;
}
}
}
return(frame_start);
}
vector <int > define_frame(int frame_size, int frame_start, int bam_matrix){
//define the frame for the search
vector <int> push_ints;
push_ints.push_back(frame_start - (frame_size / 2));
push_ints.push_back(frame_start + (frame_size / 2));
if(push_ints[0] < 0){
push_ints[0] = 0;
push_ints[1] = frame_size;
if(push_ints[1] > bam_matrix){
push_ints[1] = frame_size;
}
}
if(push_ints[1] > bam_matrix){
push_ints[1] = bam_matrix;
push_ints[0] = bam_matrix - (frame_size / 2);
if(push_ints[0] < 0){
push_ints[0] = 0;
}
}
return(push_ints);
}
void thread_handler(int nthread, vector <int> frame, vector <bam_headers> bam_matrix, gtf_headers gtf_record){
int thread_divide = frame[1]-frame[0];//frame_size / nthread;
int thread_remain = (frame[1]-frame[0]) % nthread;
int* start_holder = new int[thread_divide];
for(int i = 0; i < nthread; i++){
if (i < nthread - 1){
for (int frame_index = 0; frame_index < thread_divide; frame_index++){
start_holder[frame_index] = bam_matrix[frame[0]+frame_index].start;
}
frame[0] = frame[0] + thread_divide;
thread first(process, start_holder,thread_divide,gtf_record.start,gtf_record.end);
first.join();
}else{
for (int frame_index = 0; frame_index < thread_divide + thread_remain; frame_index++){
start_holder[frame_index] = bam_matrix[frame[0]+frame_index].start;
}
thread last(process, start_holder,thread_divide + thread_remain,gtf_record.start,gtf_record.end);
last.join();
}
}
}
int main (int argc, char *argv[])
{
// usage
// ./count threads frame_size gtf_file files
//define matrix to memory holding gtf annotations by assoc. header
vector <gtf_headers> gtf_matrix = load_gtf(argv[3]);
//load bam, perform counts
for(int i = 4;i < argc;i++){
//iterate through filenames in argv, define matrix to memory holding bam alignments chr and bp position
vector <vector <bam_headers> > bam_matrix = load_bam(argv[i]);
//map chromosome to bam matrix index
vector <string> index_mapping = find_index(bam_matrix);
//iterate through gtf matrix, find corresponding bins for chr, set search frames, and count
for(int gtf_i = 0; gtf_i < gtf_i < gtf_matrix.size();gtf_i++){ //gtf_i < gtf_matrix.size()
hit_count = 0;
//find corresponding bins for gtf chr
short int bin_compare = find_bin(gtf_matrix[gtf_i].chr,index_mapping);
if(bin_compare != -1){
//find start of search frame
int frame_start = find_frame(gtf_matrix[gtf_i], bam_matrix[bin_compare]);
//get up lower bounds of search frame;
vector <int> full_frame = define_frame(atoi(argv[2]),frame_start,bam_matrix[bin_compare].size());
//create c array of bam positional data for the frame, and post to thread process
thread_handler(atoi(argv[1]),full_frame,bam_matrix[bin_compare],gtf_matrix[gtf_i]);
}
//counts displayed in STOUT
cout << gtf_matrix[gtf_i].chr << "t" << gtf_matrix[gtf_i].source << "t" << gtf_matrix[gtf_i].feature << "t" << gtf_matrix[gtf_i].start << "t" << gtf_matrix[gtf_i].end << "t" << gtf_matrix[gtf_i].score << "t" << gtf_matrix[gtf_i].strand << "t" << gtf_matrix[gtf_i].frame << "t" << gtf_matrix[gtf_i].annotation << "t" << hit_count << "n";
}
}
}
问题的答案很简单:
thread last(process, start_holder,thread_divide + thread_remain,gtf_record.start,gtf_record.end);
last.join();
在这里,父任务创建一个新线程,并且。。。立即等待线程完成。这就是join()的作用,它等待线程终止。
因此,您的代码启动一个新线程,并立即等待它完成,然后再执行其他操作,比如启动下一个线程。
您需要重写thread_handler()来实例化所有std::thread
实例,然后在实例化所有实例后,对每个实例调用join()
,等待所有实例完成。
典型的方法是使用std::thread
的默认构造函数预创建所有线程实例的std::vector
,然后对它们进行循环以初始化每个实例,然后再次对它们进行环路,对每个实例调用join
()。
相关文章:
- 我应该使用什么来代替void作为变体中的替代类型之一
- boost::asio::steady_timer()与sleep()我应该使用哪一个
- 我应该实现右值推送功能吗?我应该使用std::move吗
- 我是C++编程的新手,这些代码之间有什么区别,我应该使用哪一个
- 我应该删除矢量<short>吗?
- 我应该如何修改此代码以使用给定字符串中的字母打印菱形图案
- 我应该在锁定TBitmap画布后解锁它吗
- 为什么我应该在异常处理中使用std::cerr而不是std::cout
- 我应该避免多重实现继承吗
- 为了方便起见,我应该避免公开私有字段变量吗
- 我收到同义重复编译器错误。我应该如何修复"类型"X"的参数与类型"X"的参数不兼容?
- 违反const正确性:我应该现实地期待什么问题
- 我应该如何表示我拥有的连续元素序列?
- 我应该将除 .cpp 以外的其他文件添加到 git 中吗?
- 我应该如何从 stdin C++ 中读取可变长度的格式字符串?
- 我有一个对象,它将在整个程序的持续时间内实例化,但一个类成员不会,我应该动态分配它吗?
- 我应该如何捕捉out_of_range异常?
- WTSFreeMemory在启动期间从服务调用时挂起-我应该省略吗
- 当服务器重新启动并且客户端收到WSAECONNRESET错误代码时,我应该重新创建整个套接字吗
- 如果我的CPU负载表明不是这样,我应该启动多个线程吗