我们如何并行运行算法的 n 个实例并以有效的方式计算结果函数的平均值?

How can we run n instances of an algorithm in parallel and compute the mean of a function of the results in an efficient way?

本文关键字:有效 方式 计算 平均值 函数 结果 实例 何并行 并行 我们 算法      更新时间:2023-10-16

我想并行运行算法的n实例,并计算函数f结果的平均值。如果我没有大错特错,下面的代码可以实现这个目标:

struct X {};
int f(X) { return /* ... */; }
int main()
{
std::size_t const n = /* ... */;
std::vector<std::future<X>> results;
results.reserve(n);
for (std::size_t i = 0; i < n; ++i)
results.push_back(std::async([]() -> X { /* ... */ }));
int mean = 0;
for (std::size_t i = 0; i < n; ++i)
mean += f(results[i].get());
mean /= n;
}

但是,有没有更好的方法可以做到这一点?上面代码的明显问题如下:行mean += f(results[i].get());中的求和顺序无关紧要。因此,最好在结果可用后立即将其添加到mean中。如果在上面的代码中,第i个任务的结果尚不可用,则程序将等待该结果,而任务n - 1i + 1的所有结果可能已经可用。

那么,我们如何才能以更好的方式做到这一点呢?

你正在阻止未来,这是一个太早的操作。

为什么不更新异步线程中的累积总和,然后在所有线程完成时阻止?

#include <condition_variable>
#include <thread>
#include <mutex>
struct X {};
int f(X);
X make_x(int);
struct algo_state
{
std::mutex m;
std::condition_variable cv;
int remaining_tasks;
int accumulator;
};
void task(X x, algo_state& state)
{
auto part = f(x);
auto lock = std::unique_lock(state.m);
state.accumulator += part;
if (--state.remaining_tasks == 0)
{
lock.unlock();
state.cv.notify_one();
}
}
int main()
{
int get_n();
auto n = get_n();
algo_state state = {
{},
{},
n,
0
};
for(int i = 0 ; i < n ; ++i)
std::thread([&] { task(make_x(i), state); }).detach();
auto lock = std::unique_lock(state.m);
state.cv.wait(lock, [&] { return state.remaining_tasks == 0; });
auto mean = state.accumulator / n;
return mean;
}

无法将其放入评论中:

无需将 N 个函数传递给 N个数据点 (X( 的 M 线程,您可以拥有:

  • 每个元素的数据元素的 N/K 个队列
  • 池中的 M 线程(生产者,准备好具有相同的功能(
  • 1 个使用者(加法器(线程(主?

并在线程之间仅传递 N 个数据点。传递函数并执行它们可能比数据有更多的开销。

此外,这些函数可以添加到共享变量中,而无需在外部进行任何额外的求和,然后只有M生产者可以使用合适的同步,例如原子或锁防护。

该结构的大小是多少?

最简单的方法

如何让 lambda 返回f(x)而不是 x:

for (std::size_t i = 0; i < n; ++i)
results.push_back(std::async([]() -> int { /* ... */ }));

在这种情况下,可以尽快执行f(),而无需等待。平均计算仍需要按顺序等待。但这是一个错误的问题,因为没有什么比汇总整数更快了,无论如何,在对每个部分求和之前,您将无法完成平均值的计算。

简单的替代方案

还有一种方法可能是使用atomic<int> mean;并在 lambda 中捕获它并更新总和。 因此,最后,您只需要确保在进行除法之前所有的未来都已交付。 但如前所述,考虑到整数加法的成本,这可能有点矫枉过正。

std::vector<std::future<void>> results;
...
atomic<int> mean{0};
for (std::size_t i = 0; i < n; ++i)
results.push_back(std::async([&mean]() -> void 
{ X x = ...; int i=f(x); mean+=i; return; }));    
for (std::size_t i = 0; i < n; ++i)
results[i].get();
mean = mean/n;   // attention not an atomic operation, but all concurent things are done