在C++中跨文件行并行化的惯用方法

Idiomatic way to parallelize function across file lines in C++

本文关键字:方法 并行化 C++ 文件      更新时间:2023-10-16

我发现自己经常编写C++形式的代码:

while (getline(strm, line)) {
    cout << computationally_intensive_function(line) << endl;
}

我想并行化这段代码。到目前为止,我想出的最佳解决方案是构建字符串向量来容纳大量(10000-100000(行,然后在这个向量上并行化

#pragma omp parallel for

然后清空向量并在保留行时重复。但是,此方法需要大量内存,并且其他内核在主进程缓冲字符串时处于空闲状态。有没有更好的方法?像Python的multiprocessing.Pool.map或Hadoop?(但是,我想避免使用Hadoop的C++ API,因为Hadoop相当重量级,可能不会安装在我的代码运行的任何地方。

OpenMP 3.0 任务存在一个不为人所知的功能,这是非常不幸的,因为它们是专门为涵盖此类情况而创建的。如果您的编译器支持该标准版本,您绝对应该选择 OpenMP 任务。但请记住,从多个线程写入stdout(或std::cout(通常会严重混合它们的输出,您很可能希望对其进行同步:

#pragma omp parallel
{
    #pragma omp master
    while (getline(strm, line))
    #pragma omp task
    {
        result_type result = computationally_intensive_function(line);
        #pragma omp critical
        {
            cout << result << endl;
            cout.flush();
        }
    }
    #pragma omp taskwait
}

我让你们来决定应该shared哪些变量,应该private什么。

你应该与从文件中读取行重叠你的计算。一个很好的方法是使用线程构建块管道算法。您要做的是指定三个(基于伪代码示例中显示的内容(过滤器,两个串行筛选器和一个并行筛选器。串行滤波器是输入和输出滤波器。第一个逐行从文件中读取数据,并将每一行传递给第二个过滤器,第二个过滤器是并行的,并在多线程模式下运行您的计算/处理函数。最后一级/滤波器也是串行的,它确实输出。我正在复制粘贴TBB教程中的一个例子,它似乎正在做你想要实现的目标:

// Holds a slice of text.
/** Instances *must* be allocated/freed using methods herein, because the
C++ declaration
represents only the header of a much larger object in memory. */
class TextSlice {
    // Pointer to one past last character in sequence
    char* logical_end;
    // Pointer to one past last available byte in sequence.
    char* physical_end;
public:
    // Allocate a TextSlice object that can hold up to max_size characters.
    static TextSlice* allocate( size_t max_size ) {
        // +1 leaves room for a terminating null character.
        TextSlice* t = (TextSlice*)tbb::tbb_allocator<char>().allocate(sizeof(TextSlice)+max_size+1 );
        t->logical_end = t->begin();
        t->physical_end = t->begin()+max_size;
        return t;
    }
    // Free this TextSlice object
    void free() {
        tbb::tbb_allocator<char>().deallocate((char*)this,
        sizeof(TextSlice)+(physical_end-begin())+1);
    }
    // Pointer to beginning of sequence
    char* begin() {return (char*)(this+1);}
    // Pointer to one past last character in sequence
    char* end() {return logical_end;}
    // Length of sequence
    size_t size() const {return logical_end-(char*)(this+1);}
    // Maximum number of characters that can be appended to sequence
    size_t avail() const {return physical_end-logical_end;}
    // Append sequence [first,last) to this sequence.
    void append( char* first, char* last ) {
        memcpy( logical_end, first, last-first );
        logical_end += last-first;
    }
    // Set end() to given value.
    void set_end( char* p ) {logical_end=p;}
};

运行它的函数是:

void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) {
    tbb::parallel_pipeline(
    ntoken,
    tbb::make_filter<void,TextSlice*>(
    tbb::filter::serial_in_order, MyInputFunc(input_file) )
    &
    tbb::make_filter<TextSlice*,TextSlice*>(
    tbb::filter::parallel, MyTransformFunc() )
    &
    tbb::make_filter<TextSlice*,void>(
    tbb::filter::serial_in_order, MyOutputFunc(output_file) ) );
}