如何将递归函数的线程与子线程同步

How to synchronize thread for recursive function with sub-threads

本文关键字:线程 同步 递归函数      更新时间:2023-10-16

我对C++和线程还很陌生,我已经在这个问题上呆了好几天了。。它应该形成fft(快速傅立叶变换)的基码——只是一个基码,所以仍然缺少一些东西,比如旋转项,输入是双数(还不是复数)。

我想用C++对函数f_thread进行一些并行编程。。。这是一个正在运行的"可编译"代码

#include<iostream>
#include<thread>
#include <vector>
#include <mutex>
void get_odd_elements(std::vector<double> inpt, std::vector<double> &out) {
for (int i = 0; i < inpt.size()-1; i = i + 2) {out[i/2] = inpt[i];}
}
void get_even_elements(std::vector<double> inpt, std::vector<double> &out) {
for (int i = 1; i < inpt.size(); i = i + 2) {out[i/2] = inpt[i];}
}
void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
for (int i = 0; i < a.size(); i++) {out[i] = a[i];}
for (int i = a.size(); i < a.size()+b.size(); i++) {out[i] = b[i];}
}
void add_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = x[i] + y[i];}}
void sub_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = y[i] - x[i];}}
//the f_thread function
void f_thread(std::vector<double> in, std::vector<double> &out) {
if (in.size() == 1) {out = in;}
else {
std::vector<double> f0(in.size()/2);
std::vector<double> f1(in.size()/2);
get_odd_elements(in,std::ref(f0)); //get_odd_elements is a function that gets all odd-indexed elements of f
get_even_elements(in,std::ref(f1)); //get_even_elements is a function that gets all even-indexed elements of in
std::vector<double> a(f0.size());
std::vector<double> b(f1.size());
std::mutex mtx1; std::mutex mtx2;
std::thread t0(f_thread,std::ref(f0),std::ref(a)); //create thread for f_thread on a
std::thread t1(f_thread,std::ref(f1),std::ref(b)); //create thread for f_thread on b
t0.join(); t1.join(); // join 2 threads
std::vector<double> a_out(f0.size());
std::vector<double> b_out(f1.size());
add_vectors(std::ref(a),std::ref(b),std::ref(a_out)); //call add_vectors function : a + b
sub_vectors(std::ref(a),std::ref(b),std::ref(b_out)); //call sub_vectors function : b - a
std::vector<double> f_out(in.size());
attach(a_out,b_out,std::ref(f_out)); //attach is a function that appends b to the end of a
out = f_out; 
}
}
int main() {
int n_elements = 16;
std::vector<double> sample_input(n_elements);
for (int i = 0; i < n_elements; i++) {sample_input[i] = i;}
std::vector<double> output(n_elements);
std::thread start(f_thread,std::ref(sample_input),std::ref(output));
start.join();
for (int i = 0; i < n_elements; i++) {std::cout << "output element "; std::cout << i; std::cout << ": "; std::cout << output[i]; std::cout<< "n";}
}

因此,f_thread被初始化为一个线程,然后创建两个子线程,递归地调用f_thread。我尝试了几种使用互斥的技巧,但似乎都不起作用,因为两个子线程之间的同步不顺利(这是竞争条件的热点)。这是我尝试过的一个代码,但没有成功。我还尝试使用全局递归互斥,但仍然没有改进。

#include<iostream>
#include<thread>
#include <vector>
#include <mutex>
void get_odd_elements(std::vector<double> inpt, std::vector<double> &out) {
for (int i = 0; i < inpt.size()-1; i = i + 2) {out[i/2] = inpt[i];}
}
void get_even_elements(std::vector<double> inpt, std::vector<double> &out) {
for (int i = 1; i < inpt.size(); i = i + 2) {out[i/2] = inpt[i];}
}
void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
for (int i = 0; i < a.size(); i++) {out[i] = a[i];}
for (int i = a.size(); i < a.size()+b.size(); i++) {out[i] = b[i];}
}
void add_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = x[i] + y[i];}}
void sub_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = y[i] - x[i];}}
//the f_thread function
void f_thread(std::vector<double> in, std::vector<double> &out) {
if (in.size() == 1) {out = in;}
else {
std::vector<double> f0(in.size()/2);
std::vector<double> f1(in.size()/2);
get_odd_elements(in,std::ref(f0)); //get_odd_elements is a function that gets all odd-indexed elements of f
get_even_elements(in,std::ref(f1)); //get_even_elements is a function that gets all even-indexed elements of in
std::vector<double> a(f0.size());
std::vector<double> b(f1.size());
std::mutex mtx1; std::mutex mtx2;
mtx1.lock(); std::thread t0(f_thread,std::ref(f0),std::ref(a)); mtx1.unlock(); //create thread for f_thread on a
mtx2.lock(); std::thread t1(f_thread,std::ref(f1),std::ref(b)); mtx2.unlock(); //create thread for f_thread on b
t0.join(); t1.join(); // join 2 threads
std::vector<double> a_out(f0.size());
std::vector<double> b_out(f1.size());
add_vectors(std::ref(a),std::ref(b),std::ref(a_out)); //call add_vectors function : a + b
sub_vectors(std::ref(a),std::ref(b),std::ref(b_out)); //call sub_vectors function : b - a
std::vector<double> f_out(in.size());
attach(a_out,b_out,std::ref(f_out)); //attach is a function that appends b to the end of a
out = f_out; 
}
}
int main() {
int n_elements = 16;
std::vector<double> sample_input(n_elements);
for (int i = 0; i < n_elements; i++) {sample_input[i] = i;}
std::vector<double> output(n_elements);
std::thread start(f_thread,std::ref(sample_input),std::ref(output));
start.join();
for (int i = 0; i < n_elements; i++) {std::cout << "output element "; std::cout << i; std::cout << ": "; std::cout << output[i]; std::cout<< "n";}
}

我验证了这段代码是使用g++f_thread.cpp-phread与linux(ubuntu 18.04)OS 中的标准C++库编译的

代码现在运行(不再有"中止的核心转储错误"),但线程版本的输出在每次运行时都会发生变化(表明同步工作不正常)。

作为参考,这里是代码的顺序版本,它不使用子线程,并且运行良好(即每次运行时输出都没有变化)

// WORKING sequential version
#include<iostream>
#include<thread>
#include <vector>
#include <mutex>
void get_odd_elements(std::vector<double> inpt, std::vector<double> &out) {
for (int i = 0; i < inpt.size()-1; i = i + 2) {out[i/2] = inpt[i];}
}
void get_even_elements(std::vector<double> inpt, std::vector<double> &out) {
for (int i = 1; i < inpt.size(); i = i + 2) {out[i/2] = inpt[i];}
}
void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
for (int i = 0; i < a.size(); i++) {out[i] = a[i];}
for (int i = a.size(); i < a.size()+b.size(); i++) {out[i] = b[i];}
}
void add_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = x[i] + y[i];}}
void sub_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = y[i] - x[i];}}
//the f_thread function
void f_thread(std::vector<double> in, std::vector<double> &out) {
if (in.size() == 1) {out = in;}
else {
std::vector<double> f0(in.size()/2);
std::vector<double> f1(in.size()/2);
get_odd_elements(in,std::ref(f0)); //get_odd_elements is a function that gets all odd-indexed elements of f
get_even_elements(in,std::ref(f1)); //get_even_elements is a function that gets all even-indexed elements of in
std::vector<double> a(f0.size());
std::vector<double> b(f1.size());
f_thread(std::ref(f0),std::ref(a)); // no thread, just call recursion 
f_thread(std::ref(f1),std::ref(b)); // no thread, just call recursion 
std::vector<double> a_out(f0.size());
std::vector<double> b_out(f1.size());
add_vectors(std::ref(a),std::ref(b),std::ref(a_out)); //call add_vectors function : a + b
sub_vectors(std::ref(a),std::ref(b),std::ref(b_out)); //call sub_vectors function : b - a
std::vector<double> f_out(in.size());
attach(a_out,b_out,std::ref(f_out)); //attach is a function that appends b to the end of a
out = f_out; 
}
}
int main() {
int n_elements = 16;
std::vector<double> sample_input(n_elements);
for (int i = 0; i < n_elements; i++) {sample_input[i] = i;}
std::vector<double> output(n_elements);
std::thread start(f_thread,std::ref(sample_input),std::ref(output));
start.join();
for (int i = 0; i < n_elements; i++) {std::cout << "output element "; std::cout << i; std::cout << ": "; std::cout << output[i]; std::cout<< "n";}
}

每次运行代码时,结果都应该固定到此输出。

output element 0: 120
output element 1: 0
output element 2: 0
output element 3: 7.31217e-322
output element 4: 0
output element 5: 6.46188e-319
output element 6: 56
output element 7: 0
output element 8: 0
output element 9: 4.19956e-322
output element 10: 120
output element 11: 0
output element 12: 0
output element 13: 7.31217e-322
output element 14: 0
output element 15: 6.46188e-319

这不是线程错误,而是对函数attach:中数组元素的越界访问

void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
for (int i = 0; i < a.size(); i++) {out[i] = a[i];}
for (int i = a.size(); i < a.size()+b.size(); i++) {out[i] = b[i];}
}

在第二个循环中,索引从a.size()开始,而不是从0开始,但您可以使用它来访问b的元素,就像它从0开始一样。

您可以使用<algorithm>:中的std::copy,而不是编写循环

void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
std::copy(a.begin(), a.end(), out.begin());
std::copy(b.begin(), b.end(), out.begin()+a.size());
}

之后,对于递归线程,您只需要以下内容:

std::thread t0(f_thread,std::ref(f0),std::ref(a)); //create thread for f_thread on a
std::thread t1(f_thread,std::ref(f1),std::ref(b)); //create thread for f_thread on b
t0.join(); t1.join(); // join 2 threads

没有争用,因为每个线程都使用单独的输入和输出数组(您在"父"线程的堆栈上创建的数组)。结果是确定性的,并且对于顺序和线程版本是相同的:

output element 0: 120
output element 1: 64
output element 2: 32
output element 3: 0
output element 4: 16
output element 5: 0
output element 6: 0
output element 7: 0
output element 8: 8
output element 9: 0
output element 10: 0
output element 11: 0
output element 12: 0
output element 13: 0
output element 14: 0
output element 15: 0

顺便说一句,你可能已经猜到,即使是你的串行版本也是不正确的,因为输入数据都是整数,而你只复制、添加和减去它们;因此没有理由在输出中出现像CCD_ 6这样的浮点数。

另外请注意Davis Herring的评论:你在向量之间复制了很多数据。至少,我会通过常量引用而不是通过值将向量传递给函数(除非已知这些副本已被消除)。

最后,您应该比输入数组的大小为1时更早地停止创建新线程。对于真正的问题大小,您可能无法创建数千个线程;即使你在这方面取得了成功,创建和运行这么多线程的开销也会使你的代码运行得非常非常慢。理想情况下,创建的线程数不应超过运行代码的机器上的硬件核心数。

您应该通过询问有多少cpu来处理此问题,然后将您的工作拆分并使用队列将其重新连接在一起。

我不知道FFT算法,但粗略地看一下你的代码,看起来你基本上是用越来越细的齿梳来分割数据的。除非你从最好的水平开始,然后一路向上,这不是一个很好的分解方式。

您不希望使用不同的CPU来处理其他值,因为即使在单芯片多核CPU上,也有多个L1缓存。每个一级缓存最多与一个其他核心共享。因此,您希望单个CPU处理的所有值彼此接近,以最大限度地提高您要查找的值在缓存中的机会。

因此,您应该从最大的连续块开始拆分。因为FFT算法是基于2的幂运算的,所以你应该计算你的核数。使用thread::hardware_concurrency()进行计数。然后四舍五入到下一个最高的二次方,并将您的问题划分为该数量的子FFT。然后在主线程中组合他们的结果。

我写了一个程序,可以满足你的需求。它将一个列表拆分为多个块来运行排序。然后它有一个需要完成的合并队列。每个区块都由一个单独的线程处理,每个合并也被派生到自己的线程中。

我把内核的数量一分为二,因为现代CPU的一个我不喜欢的功能叫做超线程。我本可以忽略这一点,它会运行得很好,但由于主要的争论是关于整数ALU,它可能会慢一点。(超线程在单个核心内共享资源。)

从另一个答案来看,你的FFT代码似乎有一些错误。我建议让它只使用一个线程,然后想办法将其拆分。