在C++中跨文件行并行化的惯用方法
Idiomatic way to parallelize function across file lines in C++
我发现自己经常编写C++形式的代码:
while (getline(strm, line)) {
cout << computationally_intensive_function(line) << endl;
}
我想并行化这段代码。到目前为止,我想出的最佳解决方案是构建字符串向量来容纳大量(10000-100000(行,然后在这个向量上并行化
#pragma omp parallel for
然后清空向量并在保留行时重复。但是,此方法需要大量内存,并且其他内核在主进程缓冲字符串时处于空闲状态。有没有更好的方法?像Python的multiprocessing.Pool.map
或Hadoop?(但是,我想避免使用Hadoop的C++ API,因为Hadoop相当重量级,可能不会安装在我的代码运行的任何地方。
OpenMP 3.0 任务存在一个不为人所知的功能,这是非常不幸的,因为它们是专门为涵盖此类情况而创建的。如果您的编译器支持该标准版本,您绝对应该选择 OpenMP 任务。但请记住,从多个线程写入stdout
(或std::cout
(通常会严重混合它们的输出,您很可能希望对其进行同步:
#pragma omp parallel
{
#pragma omp master
while (getline(strm, line))
#pragma omp task
{
result_type result = computationally_intensive_function(line);
#pragma omp critical
{
cout << result << endl;
cout.flush();
}
}
#pragma omp taskwait
}
我让你们来决定应该shared
哪些变量,应该private
什么。
你应该与从文件中读取行重叠你的计算。一个很好的方法是使用线程构建块管道算法。您要做的是指定三个(基于伪代码示例中显示的内容(过滤器,两个串行筛选器和一个并行筛选器。串行滤波器是输入和输出滤波器。第一个逐行从文件中读取数据,并将每一行传递给第二个过滤器,第二个过滤器是并行的,并在多线程模式下运行您的计算/处理函数。最后一级/滤波器也是串行的,它确实输出。我正在复制粘贴TBB教程中的一个例子,它似乎正在做你想要实现的目标:
// Holds a slice of text.
/** Instances *must* be allocated/freed using methods herein, because the
C++ declaration
represents only the header of a much larger object in memory. */
class TextSlice {
// Pointer to one past last character in sequence
char* logical_end;
// Pointer to one past last available byte in sequence.
char* physical_end;
public:
// Allocate a TextSlice object that can hold up to max_size characters.
static TextSlice* allocate( size_t max_size ) {
// +1 leaves room for a terminating null character.
TextSlice* t = (TextSlice*)tbb::tbb_allocator<char>().allocate(sizeof(TextSlice)+max_size+1 );
t->logical_end = t->begin();
t->physical_end = t->begin()+max_size;
return t;
}
// Free this TextSlice object
void free() {
tbb::tbb_allocator<char>().deallocate((char*)this,
sizeof(TextSlice)+(physical_end-begin())+1);
}
// Pointer to beginning of sequence
char* begin() {return (char*)(this+1);}
// Pointer to one past last character in sequence
char* end() {return logical_end;}
// Length of sequence
size_t size() const {return logical_end-(char*)(this+1);}
// Maximum number of characters that can be appended to sequence
size_t avail() const {return physical_end-logical_end;}
// Append sequence [first,last) to this sequence.
void append( char* first, char* last ) {
memcpy( logical_end, first, last-first );
logical_end += last-first;
}
// Set end() to given value.
void set_end( char* p ) {logical_end=p;}
};
运行它的函数是:
void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) {
tbb::parallel_pipeline(
ntoken,
tbb::make_filter<void,TextSlice*>(
tbb::filter::serial_in_order, MyInputFunc(input_file) )
&
tbb::make_filter<TextSlice*,TextSlice*>(
tbb::filter::parallel, MyTransformFunc() )
&
tbb::make_filter<TextSlice*,void>(
tbb::filter::serial_in_order, MyOutputFunc(output_file) ) );
}
相关文章:
- 如何使用OpenMP并行化此矩阵时间矢量运算
- 如何使用 MPI 的远程内存访问 (RMA) 功能并行化数据聚合?
- 在C++中使用并行化的预期速度是多少(不是 OpenMp,而是 <thread>)
- 如何使用 OpenMP 并行化最近邻搜索
- Malloc 在使用线程并行化 SSH 调用时存在问题
- 部分方法专用化
- 如何使用 OpenMP 正确并行化 for 循环?
- 使用包含互斥锁的类的方法实例化 cpp11 线程
- 如何将矩阵的行随机复制到内存中的另一个矩阵的过程并行化?
- 如何使用 Pthreads 并行化图像翻转?
- MPI:反复并行化缓冲区
- 是否可以使用OpenMP并行化一个列表,该列表可以在每次迭代中添加新元素
- 如何在Visual Studio中并行化armadillo
- 嵌套循环 OpenMP 并行化、私有索引还是公共索引?
- 如何并行化增加循环的大小
- 在 C 中并行化嵌套循环的几种方法之间的差异,C++使用 OpenMP
- 使用OpenMP并行化此递归的最佳方法
- 并行化的最佳方法是什么
- 在C++中跨文件行并行化的惯用方法
- OpenCV中聚类方法的并行化