TBB管道的输出不正确

incorrect output with TBB pipeline

本文关键字:不正确 输出 管道 TBB      更新时间:2023-10-16

我在文本文件中编写了一个具有不同值的C结构(100次),如1.txt,2.txt…100.txt


我在Linux上使用英特尔TBB。我创建了:

  1. 输入筛选器(serial_in_order MODE)
  2. TransformFIlter(serial_in_order MODE)
  3. 输出过滤器(串行顺序模式)

InputFilter从文件中读取结构并将其传递给TransformFilter。TrasnformFilter更新结构值并将其传递给OutputFilter。OutputFilter将新结构写入光盘。

基本上,它是一个结构的简单读写应用程序

class InputFilter: public tbb::filter {
public:
    InputFilter( int );
    ~InputFilter();
private:
    int total_streams;
    int count;
    struct video_process_object input_obj;
    void* operator()( void* );
};
InputFilter::InputFilter( int x )
        : filter( serial_in_order ) {
    total_streams = x;
    count = 1;
}
InputFilter::~InputFilter() {
    total_streams = 0;
}
void* InputFilter::operator()( void* ) {
    char path[50] = { };
    sprintf( path, "input//%d.txt", count );
    printf( "Path : %sn", path );
    FILE *fp;
    fp = fopen( path, "r" );
    if( fp == NULL || count > total_streams ) {
        fclose( fp );
        printf( "n*******Cannot find more data.Terminating********nnn" );
        return NULL;
    }
    fscanf( fp, "%d", &input_obj.video_id );
    fscanf( fp, "%s", &input_obj.storage_url );
    fscanf( fp, "%s", &input_obj.storage_type );
    fscanf( fp, "%d", &input_obj.face_detect );
    fscanf( fp, "%d", &input_obj.face_recognise );
    fscanf( fp, "%d", &input_obj.scene_recognise );
    fscanf( fp, "%d", &input_obj.activity_recognise );
    fscanf( fp, "%d", &input_obj.speech_recognise );
    fclose( fp );
    count++;
    return &input_obj;
}
class TransformFilter: public tbb::filter {
public:
    TransformFilter();
    ~TransformFilter();
private:
    struct video_process_object input_transform;
    void* operator()( void* );
};
TransformFilter::TransformFilter()
        : filter( serial_in_order ) {
}
TransformFilter::~TransformFilter() {
}
void* TransformFilter::operator()( void *item ) {
    input_transform = *static_cast<struct video_process_object*>( item );
    input_transform.video_id += 1000;
    strcat( input_transform.storage_url, "  nabeel" );
    strcat( input_transform.storage_type, " N" );
    input_transform.face_detect += 1000;
    input_transform.face_recognise += 1000;
    return &input_transform;
}
class OutputFilter: public tbb::filter {
public:
    OutputFilter();
    ~OutputFilter();
private:
    struct video_process_object output_obj;
    void* operator()( void* );
};
OutputFilter::OutputFilter()
        : filter( serial_in_order ) {
    int status = mkdir( "output", S_IRWXU | S_IRWXG | S_IRWXO );
    if( status == -1 )
        printf( "nOutput directory already existsnn" );
}
OutputFilter::~OutputFilter() {
}
void* OutputFilter::operator()( void *item ) {
    output_obj = *static_cast<struct video_process_object*>( item );
    FILE *fp;
    char path[50] = { };
    sprintf( path, "output//%d.txt", output_obj.video_id - 1000 );
    printf( "Output Path : %stt %dnn", path, output_obj.video_id );
    if( (fp = fopen( path, "w" )) == NULL ) {
        fprintf( stderr, "Cannot open output file.n" );
        return NULL;
    }
    fprintf( fp, "%dn", output_obj.video_id );
    fprintf( fp, "%sn", output_obj.storage_url );
    fprintf( fp, "%sn", output_obj.storage_type );
    fprintf( fp, "%dn", output_obj.face_detect );
    fprintf( fp, "%dn", output_obj.face_recognise );
    fprintf( fp, "%dn", output_obj.scene_recognise );
    fprintf( fp, "%dn", output_obj.activity_recognise );
    fprintf( fp, "%dn", output_obj.speech_recognise );
    fclose( fp );
    return NULL;
}
int main() {
    tbb::pipeline pipeline;
    InputFilter input_filter( 100 );
    pipeline.add_filter( input_filter );
    TransformFilter transform_filter;
    pipeline.add_filter( transform_filter );
    OutputFilter output_filter;
    pipeline.add_filter( output_filter );
    tbb::tick_count t0 = tbb::tick_count::now();
    tbb::task_scheduler_init init_parallel;
    pipeline.run( 1 );
    tbb::tick_count t1 = tbb::tick_count::now();
    return 0;
}

对于少量的文件(如5或10),一切都很好。当我读取大量文件(例如50或100)时,问题就开始了。问题是:

有时InputFilter读取10.txt文件,TransformFilter处理它。但InputFilter立即读取11.txt。OutputFIlter跳过10.txt并处理11.txt。

我怎样才能确保这种情况不会发生?

存在数据竞争,因为video_process_objects被放置在过滤器结构内部,并在过滤器之间通过引用传递(当然是并行运行的)。因此,当InputFilter开始处理下一个令牌,将新数据读取到其video_process_object中,而第一个令牌刚刚开始读取TransformFilter:中相同地址的数据时,就会出现这种情况

     Token 1                ||         Token 2
input_filter.operator()     ||
transform_filter.operator() ||  input_filter.operator()
...

要修复它,请动态分配数据,例如:

struct video_process_object *input_obj_ptr = new video_process_object;
fscanf( fp, "%d", &input_obj_ptr->video_id );
...
return input_obj_ptr;

并在最后一个筛选器中取消分配它,因为它的返回值无论如何都会被忽略。这个旧演示文稿中的幻灯片49-50描绘了类似的代码。

最后,让我对您选择的tbb::pipelene和serial_in_order过滤器类型提出质疑。TBB参考说:

在实际情况下,并行滤波器是首选,因为它们允许并行加速。如果过滤器必须是串行的,那么在实际情况下,乱序变体是首选,因为它对处理订单的限制较少。

由于处理和文件是独立的,我认为没有理由设置这种额外的"有序"限制。为了获得更好的结构化代码,还需要考虑另一个报价:

函数parallel_pipeline提供了一种强类型lambda友好的方法来构建和运行管道。