使用共享变量进行线程处理
Threading with shared variables
我尝试使用多个线程插入boost::bimap
。我在线程之间有一些共享变量,我需要通过引用传递这些变量,其中一些由每个线程执行修改。但是,我收到错误:
分段故障(核心转储(
我有以下代码。我试图通过使用std::lock_guard<std::mutex> lock(mtx)
来避免对变量的并发访问,但无法使其工作。
parallel_index.cpp
#include <iostream>
#include <string>
#include <algorithm>
#include <thread>
#include <mutex>
#include <boost/bimap.hpp>
#include <boost/bimap/unordered_set_of.hpp>
#include <boost/bimap/unordered_multiset_of.hpp>
#include "parallel_index.h"
namespace bimaps = boost::bimaps;
typedef boost::bimap<bimaps::unordered_set_of<uint64_t>,
bimaps::unordered_multiset_of<std::string> > bimap_reference;
typedef bimap_reference::value_type position;
bimap_reference reference_index_vector;
size_t total_threads = std::thread::hardware_concurrency();
std::string sequence_content = "ABCDDBACDDDCBBAAACBDAADCBDAAADCBDADADACBDDCBBBCDCBCDAADCBBCDAAADCBDA";
uint64_t sequence_length = sequence_content.length();
int split = 5;
uint64_t erase_length = 0;
unsigned int seq_itr = 0;
std::mutex mtx; // to protect against concurent access
int main(){
thread_test::create_index index;
std::thread threads[total_threads];
std::cout << total_threads << " threads lanched" << std::endl;
for(unsigned int i = 0; i < total_threads; i++){
threads[i] = std::thread(&thread_test::create_index::reference_index_hash, index,
std::ref(sequence_length), std::ref(split), std::ref(sequence_content), std::ref(erase_length));
}
for(unsigned int i = 0; i < total_threads; i++){
threads[i].join();
}
}
/*
* Creating index
*/
void thread_test::create_index::reference_index_hash(uint64_t &sequence_length, int &split,
std::string &sequence_content, uint64_t &erase_length ){
for (; seq_itr < sequence_length; ++seq_itr ){
std::lock_guard<std::mutex> lock(mtx);
std::string splitstr = sequence_content.substr(erase_length, split);
reference_index_vector.insert(position(seq_itr, splitstr));
seq_itr += split-1;
erase_length += split;
if(erase_length > 10000){
sequence_content.erase(0,erase_length);
erase_length = 0;
}
}
for( bimap_reference::const_iterator iter = reference_index_vector.begin(), iend = reference_index_vector.end();
iter != iend; ++iter ) {
std::cout << iter->left << " <--> "<< iter->right <<std::endl;
}
}
parallel_index.h
#ifndef PARALLEL_INDEX_H_
#define PARALLEL_INDEX_H_
#include<iostream>
#include <algorithm>
#include <utility>
#include <boost/bimap.hpp>
#include <boost/bimap/unordered_set_of.hpp>
#include <boost/bimap/unordered_multiset_of.hpp>
//typedef boost::unordered_map<int, std::pair<int, unsigned long int>& > reference_map;
namespace bimaps = boost::bimaps;
typedef boost::bimap<bimaps::unordered_set_of<uint64_t>,
bimaps::unordered_multiset_of<std::string > > bimap_reference;
typedef bimap_reference::value_type position;
extern bimap_reference reference_index_vector;
namespace thread_test{
class create_index{
public:
void reference_index_hash(uint64_t &sequence_length, int &split,
std::string &sequence_content, uint64_t &erase_length);
};
}
#endif /* PARALLEL_INDEX_H_ */
-------------------------------编辑---------------------------------
我尝试将字符串内容划分为等于线程数的分区,以使每个线程的部分在本地可用。但似乎没有任何效果。有时它会完成第一个线程,然后以Segmentation fault (core dumped)
parallel_index.cpp
#include <iostream>
#include <string>
#include <algorithm>
#include <thread>
#include <mutex>
#include <boost/bimap.hpp>
#include <boost/bimap/unordered_set_of.hpp>
#include <boost/bimap/unordered_multiset_of.hpp>
#include "parallel_index.h"
namespace bimaps = boost::bimaps;
typedef boost::bimap<bimaps::unordered_set_of<uint64_t>,
bimaps::unordered_multiset_of<std::string> > bimap_reference;
typedef bimap_reference::value_type position;
bimap_reference reference_index_vector;
//create threads
size_t total_threads = std::thread::hardware_concurrency();
std::string sequence_content = "ABCDDBACDDDCBBAAACBDAADCBDAAADCBDADADACBDDCBBBCDCBCDAADCBBCDAAADCBDADDCCCAAABBBAAACDCA";
uint64_t sequence_length = sequence_content.length();
int split = 5;
// split the sequence_content equal to the number of threads, and assign each partition to each thread.
uint64_t each_partition_len = sequence_content.length()/total_threads- (sequence_content.length()/total_threads)%split ;
uint64_t last_partition_len = sequence_content.length()/total_threads +
(((sequence_content.length()/total_threads)%split)*(total_threads-1)) + sequence_content.length()%total_threads;
std::mutex mtx; // to protect against concurent access
int main(){
thread_test::create_index index;
std::thread threads[total_threads];
std::cout << total_threads << " threads lanched" << std::endl;
for(unsigned int i = 0; i < total_threads; i++){
if(i < total_threads-1)
threads[i] = std::thread(&thread_test::create_index::reference_index_hash, index,
std::ref(each_partition_len), std::ref(split), std::ref(sequence_content), i);
else
threads[i] = std::thread(&thread_test::create_index::reference_index_hash, index,
std::ref(last_partition_len), std::ref(split), std::ref(sequence_content), i);
//std::lock_guard<std::mutex> lck(mtx);
std::cout << "launched thread " << i << "with id " << threads[i].get_id() << std::endl;
}
for( bimap_reference::const_iterator iter = reference_index_vector.begin(), iend = reference_index_vector.end();
iter != iend; ++iter ) {
std::cout << iter->left << " <--> "<< iter->right <<std::endl;
}
for( unsigned int i = 0; i < total_threads; ++i){
if(threads[i].joinable()){
std::cout << "trying to join thread " << i << std:: endl;
threads[i].join();
std::cout << "thread joined " << i << std:: endl;
}
}
for( bimap_reference::const_iterator iter = reference_index_vector.begin(), iend = reference_index_vector.end();
iter != iend; ++iter ) {
std::cout << iter->left << " <--> "<< iter->right <<std::endl;
}
}
/*
* Creating index
*/
void thread_test::create_index::reference_index_hash(uint64_t &sequence_length, int &split,
std::string &sequence_content, int i ){
uint64_t seq_strt = 0;
// set seq_strt
if(i == 0)
seq_strt = sequence_length * i;
else
seq_strt = sequence_length * i + 1;
for (uint64_t seq_itr = seq_strt; seq_itr <= sequence_length; ++seq_itr ){
std::string splitstr = sequence_content.substr(seq_itr, split);
mtx.lock();
//std::lock_guard<std::mutex> lock(mtx);
reference_index_vector.insert(position(seq_itr, splitstr));
mtx.unlock();
seq_itr += split-1;
}
}
parallel_index.h
#ifndef PARALLEL_INDEX_H_
#define PARALLEL_INDEX_H_
#include<iostream>
#include <algorithm>
#include <utility>
#include <boost/bimap.hpp>
#include <boost/bimap/unordered_set_of.hpp>
#include <boost/bimap/unordered_multiset_of.hpp>
namespace bimaps = boost::bimaps;
typedef boost::bimap<bimaps::unordered_set_of<uint64_t>,
bimaps::unordered_multiset_of<std::string > > bimap_reference;
typedef bimap_reference::value_type position;
extern bimap_reference reference_index_vector;
namespace thread_test{
class create_index{
public:
void reference_index_hash(uint64_t &sequence_length, int &split,
std::string &sequence_content, int i);
};
}
#endif /* PARALLEL_INDEX_H_ */
我觉得segmentation fault
的罪魁祸首只不过是库的链接static
。这不是通过将seq_itr
递增到大于实际序列长度的值,因为如果 seq_itr
for 循环大于实际序列长度,则 for 循环将永远不允许输入。您尝试通过删除 -static
标志来工作,它应该通过不给出分段错误来工作,但它不能保证其他代码的正确性。有关thread
分段错误的更多详细信息,请参见
所有线程都将尝试在关键部分获得锁,为了保持位图完整,您需要一个条件变量,以便线程将有序执行。这是合理的,因为您在 reference_index_hash(( 中使用 seq_itr 作为局部变量,并且需要按适当的顺序递增。
原始代码中的一个问题是,在没有从多个线程同步的情况下访问unsigned int seq_itr
。除了产生无效结果外,还可能导致seq_itr
递增到大于实际序列长度的值,并且以下访问可能会导致崩溃。
新代码通过仅传递索引来解决此问题,只要这些索引不重叠且计算正确,这些索引就应该没问题。我不能完全遵循逻辑,但是如果您的seq_strt
计算关闭,程序也可能由于索引无效而崩溃。应该很容易在调试器中或使用一些索引断言进行验证。
但是,在第二个代码示例中存在一个问题,即在线程启动后直接打印映射
for( bimap_reference::const_iterator iter = reference_index_vector.begin(), iend = reference_index_vector.end();
iter != iend; ++iter ) {
std::cout << iter->left << " <--> "<< iter->right <<std::endl;
}
这不会产生正确的结果,因为映射由所有工作线程并发访问。join()
秒后访问是安全的。
- 将更高的优先级设置为 boost::asio 线程处理进程
- C++ 使用 2 个容器进行线程处理
- 当线程处理不同的类时,应该在哪里声明条件变量、互斥对象
- 多线程处理中的静态成员变量
- Opencv cpp 使用多线程处理同一视频的不同部分
- 对象析构函数在多线程处理时不断被调用,但该对象并未超出范围
- 使用 wxWidgets 进行多线程处理时出现奇怪的行为
- 使用共享变量进行线程处理
- 通过多线程处理确定每个字符在文件中出现的次数
- 使用多线程处理的异步请求
- 多线程处理,同时保持部分序列
- 如何在类中进行 c++ 多线程处理(将线程引用保留为成员 var)
- 用管道在C++中创建调度队列/线程处理程序:FIFO溢出
- 用多个线程处理SIGTERM的正确方法
- 使用C++的递归线程处理会使资源暂时不可用
- SDL 带变量的多线程处理 -- 无法按预期工作
- 使用多线程处理对象数组 - 无效使用 void 表达式错误
- 如何使用"priority"进行多线程处理?
- 使用简单的过程进行慢速多线程处理
- C++11 使用共享对象的多线程处理