将执行循环拆分为多个线程 (1-N-1-N-1..)
Cyclic splitting of execution into several threads (1-N-1-N-1...)
考虑这种情况:
for (...)
{
const size_t count = ...
for (size_t i = 0; i < count; ++i)
{
calculate(i); // thread-safe function
}
}
使用 C++17 和/或提升最大限度地提高性能的最优雅的解决方案是什么?
循环的"创建 + 连接"线程毫无意义,因为开销巨大(在我的情况下,这完全等于可能的收益)。
所以我只需要创建 N 个线程一次,并使它们与主线程保持同步(使用:互斥、shared_mutex、condition_variable、原子等)。对于这种常见和清晰的情况(为了使一切真正安全和快速),这似乎是相当艰巨的任务。白天坚持下去,我有一种"发明自行车"的感觉......
- 更新 1:calculate(x) 和 calculate(y) 可以(并且应该)运行 平行
- 更新 2:std::atomic::fetch_add(或 smth.)更可取 比队列(或 SMTH)。
- 更新3:极端计算(即数百万个"外部"调用和数百个"内部"调用)
- 更新 4:calculate() 更改内部对象的数据而不返回值
中间溶液
出于某种原因,"异步 + 等待"比"创建 + 加入"线程快得多。所以这两个例子使速度提高了100%:
例 1
for (...)
{
const size_t count = ...
future<void> execution[cpu_cores];
for (size_t x = 0; x < cpu_cores; ++x)
{
execution[x] = async(launch::async, ref(*this), x, count);
}
for (size_t x = 0; x < cpu_cores; ++x)
{
execution[x].wait();
}
}
void operator()(const size_t x, const size_t count)
{
for (size_t i = x; i < count; i += cpu_cores)
{
calculate(i);
}
}
例 2
for (...)
{
index = 0;
const size_t count = ...
future<void> execution[cpu_cores];
for (size_t x = 0; x < cpu_cores; ++x)
{
execution[x] = async(launch::async, ref(*this), count);
}
for (size_t x = 0; x < cpu_cores; ++x)
{
execution[x].wait();
}
}
atomic<size_t> index;
void operator()(const size_t count)
{
for (size_t i = index.fetch_add(1); i < count; i = index.fetch_add(1))
{
calculate(i);
}
}
是否可以通过仅创建一次线程,然后以较小的开销同步它们来使其更快?
最终解决方案
与标准::异步相比,速度提高了+20%!
for (size_t i = 0; i < _countof(index); ++i) { index[i] = i; }
for_each_n(par_unseq, index, count, [&](const size_t i) { calculate(i); });
是否可以避免冗余阵列"索引"?
是的:
for_each_n(par_unseq, counting_iterator<size_t>(0), count,
[&](const size_t i)
{
calculate(i);
});
过去,您会使用 OpenMP、GNU Parallel、Intel TBB。
如果你有 c++17²,我建议将执行策略与标准算法一起使用。
这真的比你自己做事要好,尽管它
- 需要一些深思熟虑来选择适合标准算法的类型
- 如果您知道引擎盖下会发生什么,仍然会有所帮助
这是一个简单的例子,事不宜迟:
实时编译器资源管理器
#include <thread>
#include <algorithm>
#include <random>
#include <execution>
#include <iostream>
using namespace std::chrono_literals;
static size_t s_random_seed = std::random_device{}();
static auto generate_param() {
static std::mt19937 prng {s_random_seed};
static std::uniform_int_distribution<> dist;
return dist(prng);
}
struct Task {
Task(int p = generate_param()) : param(p), output(0) {}
int param;
int output;
struct ByParam { bool operator()(Task const& a, Task const& b) const { return a.param < b.param; } };
struct ByOutput { bool operator()(Task const& a, Task const& b) const { return a.output < b.output; } };
};
static void calculate(Task& task) {
//std::this_thread::sleep_for(1us);
task.output = task.param ^ 0xf0f0f0f0;
}
int main(int argc, char** argv) {
if (argc>1) {
s_random_seed = std::stoull(argv[1]);
}
std::vector<Task> jobs;
auto now = std::chrono::high_resolution_clock::now;
auto start = now();
std::generate_n(
std::execution::par_unseq,
back_inserter(jobs),
1ull << 28, // reduce for small RAM!
generate_param);
auto laptime = [&](auto caption) {
std::cout << caption << " in " << (now() - start)/1.0s << "s" << std::endl;
start = now();
};
laptime("generate randum input");
std::sort(
std::execution::par_unseq,
begin(jobs), end(jobs),
Task::ByParam{});
laptime("sort by param");
std::for_each(
std::execution::par_unseq,
begin(jobs), end(jobs),
calculate);
laptime("calculate");
std::sort(
std::execution::par_unseq,
begin(jobs), end(jobs),
Task::ByOutput{});
laptime("sort by output");
auto const checksum = std::transform_reduce(
std::execution::par_unseq,
begin(jobs), end(jobs),
0, std::bit_xor<>{},
std::mem_fn(&Task::output)
);
laptime("reduce");
std::cout << "Checksum: " << checksum << "n";
}
当使用种子42
运行时,打印:
generate randum input in 10.8819s
sort by param in 8.29467s
calculate in 0.22513s
sort by output in 5.64708s
reduce in 0.108768s
Checksum: 683872090
除第一个(随机生成)步骤外,所有内核的 CPU 利用率均为 100%。
¹(我想我在这个网站上演示了所有这些答案)。
² 参见 C++17 并行算法是否已经实现?
相关文章:
- 从不同线程使用int64的不同字节安全吗
- 删除一个线程上有数百万个字符串的大型哈希映射会影响另一个线程的性能
- 在C++中使用cURL和多线程
- 为什么我的C#代码在调用回C++COM直到Task时会暂停.等待/线程.加入
- 在cuda线程之间共享大量常量数据
- 如何将元素添加到数组的线程安全函数?
- 线程,如果else语句,都是错误的上下文切换后,会发生什么
- C++Boost Asio Pool线程,带有lambda函数和传递引用变量
- Qt C++静态thread_local QNetworkAccessManager是线程应用程序的好选择吗
- 异常属于C++中的线程还是进程
- C++中的线程安全删除
- C++使用params创建线程函数会导致转换错误
- 类与私有变量的其他类之间的线程安全性
- CoInitialize()在单独的线程上崩溃而不返回
- c++中的线程池
- 线程之间的布尔停止信号
- 为什么std::async使用同一个线程运行函数
- 用于矢量处理的多个线程
- C++为线程工作动态地分割例程
- 为什么我不能在 while 循环中创建线程?