并行计算一个大矢量的和

Calculating the sum of a large vector in parallel

本文关键字:一个 并行计算      更新时间:2023-10-16

问题背景

我有一个程序,目前使用std::accumulate来汇总约1亿个元素的大型std::vector s需要很长时间,这是一个瓶颈。

我希望它更快,并且是异步计算,这样GUI/Server就不会阻塞。计算也应该使用多线程,这样我就可以减少求和向量所需的时间。

我想把总和分开,这样每个线程都会对向量的一部分求和,然后当计算出所有的部分和时,每个线程的部分和应该加在一起,得到总和。

Boost,Asio

我想知道如何在Boost.Asio中进行此操作?理想情况下,我的程序需要重用线程(如线程组(,不确定如何存储和检索部分和,并最终检索部分和的和。

我想创建一个调用boost::asio::io_service::run的线程组,传递一个处理程序来计算部分和,但我不知道如何将部分和传递给另一个处理函数,并将所有部分和相加。

如果有人向我展示一些关于如何做到这一点的基本代码,那就太好了。

Boost.Asio适合这个问题吗

Boost.Asio的主要目的是为网络I/O编程提供异步模型,而您描述的问题似乎与网络和I/O没有太大关系。

我认为最简单的解决方案是使用Boost或C++标准库提供的线程原语

一种并行算法

以下是仅使用标准库创建的并行版本accumulate的示例。

/* Minimum number of elements for multithreaded algorithm.
   Less than this and the algorithm is executed on single thread. */
static const int MT_MIN_SIZE = 10000;
template <typename InputIt, typename T>
auto parallel_accumulate(InputIt first, InputIt last, T init) {
    // Determine total size.
    const auto size = std::distance(first, last);
    // Determine how many parts the work shall be split into.
    const auto parts = (size < MT_MIN_SIZE)? 1 : std::thread::hardware_concurrency();
    std::vector<std::future<T>> futures;
    // For each part, calculate size and run accumulate on a separate thread.
    for (std::size_t i = 0; i != parts; ++i) {
        const auto part_size = (size * i + size) / parts - (size * i) / parts;
        futures.emplace_back(std::async(std::launch::async,
            [=] { return std::accumulate(first, std::next(first, part_size), T{}); }));
        std::advance(first, part_size);
    }
    // Wait for all threads to finish execution and accumulate results.
    return std::accumulate(std::begin(futures), std::end(futures), init,
        [] (const T prev, auto& future) { return prev + future.get(); });
}

实时示例(并行版本的性能与Coliru上的顺序版本大致相同,可能只有1个内核可用(

计时

在我的机器上(使用8个线程(,并行版本的性能平均提高了约120%。

顺序和:
所用时间:46毫秒
500000050000000
--------------------------------
并行和:
所用时间:21毫秒
500000050000000

然而,100000000个元素的绝对增益只是边际的(25ms(。尽管如此,当累积不同于int的元素类型时,性能增益可能更大。

OpenMP

正如@sehe在评论中提到的,值得一提的是OpenMP可能会为这个问题提供一个简单的解决方案,例如

template <typename T, typename U>
auto omp_accumulate(const std::vector<T>& v, U init) {
    U sum = init;
    #pragma omp parallel for reduction(+:sum)
    for(std::size_t i = 0; i < v.size(); i++) {
        sum += v[i];
    }
    return sum;
}

在我的机器上,这种方法与使用标准线程基元的并行方法相同。

顺序和:
所用时间:46毫秒
500000050000000
--------------------------------
并行和:
所用时间:21毫秒
总额:500000050000000
--------------------------------
OpenMP总和:
所用时间:21毫秒
金额:500000050000000

可以使用Boost Asio作为线程池。但除非你有。。。异步IO操作进行协调。

在对"带有阻塞的c++工作队列"的回答中,我展示了两个thread_pool实现:

  • 解决方案#1:基于boost::asio::io_service的解决方案
  • 解决方案#2:另一个基于boost::thread原语

两者都接受任何与void()签名兼容的任务。这意味着,您可以将返回重要结果的函数封装在packaged_task<...>中,并从中获取future<RetVal>