并行循环中的懒惰矢量访问
Lazy vector access in parallel loops
在性能关键内部,并行代码I有一个向量,其元素为:
- 计算成本非常高,结果具有确定性(给定位置的元素值仅取决于位置(
- 随机访问(通常访问次数大于或远大于矢量的大小(
- 群集访问(许多访问请求相同的值(
- 矢量由不同的线程共享(竞争条件?(
- 为了避免堆碎片整理,永远不应该重新创建对象,而是尽可能重新设置和回收对象
- 要放置在矢量中的值将由多态对象提供
目前,我预先计算了向量的所有可能值,所以竞争条件应该不是问题。为了提高性能,我正在考虑创建一个懒惰向量,这样代码只在请求向量的元素时执行计算。在并行区域中,可能会发生多个线程同时请求并可能计算同一元素的情况。我该如何处理这种可能的比赛情况?
下面是我想要实现的目标的一个例子。它在Windows 10、Visual Studio 17下正确编译和运行。我使用C++17。
// Lazy.cpp : Defines the entry point for the console application.
#include "stdafx.h"
#include <vector>
#include <iostream>
#include <stdlib.h>
#include <chrono>
#include <math.h>
const double START_SUM = 1;
const double END_SUM = 1000;
//base object responsible for providing the values
class Evaluator
{
public:
Evaluator() {};
~Evaluator() {};
//Function with deterministic output, depending on the position
virtual double expensiveFunction(int pos) const = 0;
};
//
class EvaluatorA: public Evaluator
{
public:
//expensive evaluation
virtual double expensiveFunction(int pos) const override {
double t = 0;
for (int j = START_SUM; j++ < END_SUM; j++)
t += log(exp(log(exp(log(j + pos)))));
return t;
}
EvaluatorA() {};
~EvaluatorA() {};
};
class EvaluatorB : public Evaluator
{
public:
//even more expensive evaluation
virtual double expensiveFunction(int pos) const override {
double t = 0;
for (int j = START_SUM; j++ < 10*END_SUM; j++)
t += log(exp(log(exp(log(j + pos)))));
return t;
}
EvaluatorB() {};
~EvaluatorB() {};
};
class LazyVectorTest //vector that contains N possible results
{
public:
LazyVectorTest(int N,const Evaluator & eval) : N(N), innerContainer(N, 0), isThatComputed(N, false), eval_ptr(&eval)
{};
~LazyVectorTest() {};
//reset, to generate a new table of values
//the size of the vector stays constant
void reset(const Evaluator & eval) {
this->eval_ptr = &eval;
for (int i = 0; i<N; i++)
isThatComputed[i] = false;
}
int size() { return N; }
//accessing the same position should yield the same result
//unless the object is resetted
const inline double& operator[](int pos) {
if (!isThatComputed[pos]) {
innerContainer[pos] = eval_ptr->expensiveFunction(pos);
isThatComputed[pos] = true;
}
return innerContainer[pos];
}
private:
const int N;
const Evaluator* eval_ptr;
std::vector<double> innerContainer;
std::vector<bool> isThatComputed;
};
//the parallel access will take place here
template <typename T>
double accessingFunction(T& A, const std::vector<int>& elementsToAccess) {
double tsum = 0;
int size = elementsToAccess.size();
//#pragma omp parallel for
for (int i = 0; i < size; i++)
tsum += A[elementsToAccess[i]];
return tsum;
}
std::vector<int> randomPos(int sizePos, int N) {
std::vector<int> elementsToAccess;
for (int i = 0; i < sizePos; i++)
elementsToAccess.push_back(rand() % N);
return elementsToAccess;
}
int main()
{
srand(time(0));
int minAccessNumber = 1;
int maxAccessNumber = 100;
int sizeVector = 50;
auto start = std::chrono::steady_clock::now();
double res = 0;
float numberTest = 100;
typedef LazyVectorTest container;
EvaluatorA eval;
for (int i = 0; i < static_cast<int>(numberTest); i++) {
res = eval.expensiveFunction(i);
}
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double, std::milli>diff(end - start);
double benchmark = diff.count() / numberTest;
std::cout <<"Average time to compute expensive function:" <<benchmark<<" ms"<<std::endl;
std::cout << "Value of the function:" << res<< std::endl;
std::vector<std::vector<int>> indexs(numberTest);
container A(sizeVector, eval);
for (int accessNumber = minAccessNumber; accessNumber < maxAccessNumber; accessNumber++) {
indexs.clear();
for (int i = 0; i < static_cast<int>(numberTest); i++) {
indexs.emplace_back(randomPos(accessNumber, sizeVector));
}
auto start_lazy = std::chrono::steady_clock::now();
for (int i = 0; i < static_cast<int>(numberTest); i++) {
A.reset(eval);
double res_lazy = accessingFunction(A, indexs[i]);
}
auto end_lazy = std::chrono::steady_clock::now();
std::chrono::duration<double, std::milli>diff_lazy(end_lazy - start_lazy);
std::cout << accessNumber << "," << diff_lazy.count() / numberTest << ", " << diff_lazy.count() / (numberTest* benchmark) << std::endl;
}
return 0;
}
与其推出自己的锁定,不如先看看std::call_once
是否能获得可接受的性能。
class LazyVectorTest //vector that contains N possible results
{
//Function with deterministic output, depending on the position
void expensiveFunction(int pos) {
double t = 0;
for (int j = START_SUM; j++ < END_SUM; j++)
t += log(exp(log(exp(log(j+pos)))));
values[pos] = t;
}
public:
LazyVectorTest(int N) : values(N), flags(N)
{};
int size() { return values.size(); }
//accessing the same position should yield the same result
double operator[](int pos) {
std::call_once(flags[pos], &LazyVectorTest::expensiveFunction, this, pos);
return values[pos];
}
private:
std::vector<double> values;
std::vector<std::once_flag> flags;
};
call_once
是相当透明的。它只允许一个线程运行函数以完成。唯一的潜在缺点是它会阻塞第二个线程等待可能的异常,而不是立即什么都不做。在这种情况下,这是可取的,因为您希望修改values[pos] = t;
在读取return values[pos];
之前进行排序
您当前的代码有问题,主要是因为std::vector<bool>
很糟糕,但也缺少原子性和内存一致性。以下是一个完全基于OpenMP的解决方案的示意图。我建议为丢失的条目使用特殊的标记,而不是单独的vector<bool>
——这会让一切变得更容易:
class LazyVectorTest //vector that contains N possible results
{
public:
LazyVectorTest(int N,const Evaluator & eval) : N(N), innerContainer(N, invalid), eval_ptr(&eval)
{};
~LazyVectorTest() {};
//reset, to generate a new table of values
//the size of the vector stays constant
void reset(const Evaluator & eval) {
this->eval_ptr = &eval;
for (int i = 0; i<N; i++) {
// Use atomic if that could possible be done in parallel
// omit that for performance if you doun't ever run it in parallel
#pragma omp atomic write
innerContainer[i] = invalid;
}
// Flush to make sure invalidation is visible to all threads
#pragma omp flush
}
int size() { return N; }
// Don't return a reference here
double operator[] (int pos) {
double value;
#pragma omp atomic read
value = innerContainer[pos];
if (value == invalid) {
value = eval_ptr->expensiveFunction(pos);
#pragma omp atomic write
innerContainer[pos] = value;
}
return value;
}
private:
// Use nan, inf or some random number - doesn't really matter
static constexpr double invalid = std::nan("");
const int N;
const Evaluator* eval_ptr;
std::vector<double> innerContainer;
};
在发生冲突的情况下,其他线程将只冗余地计算该值。-利用确定性。我在读取和写入元素时都使用omp atomic
,可以确保不会读取不一致的"半写"值。
这种解决方案可能会为罕见的坏情况创建一些额外的延迟。反过来,好的情况是最优的,只需要一个原子读取。您甚至不需要任何内存flush
es/seq_cst
——最坏的情况是冗余计算。如果分别编写标志和值,则需要这些(顺序一致性(,以确保更改可见的顺序是正确的。
- 如何循环访问常量字符**?
- 如何在 c++ 中仅循环访问特定列?
- 从嵌套循环中的 std::list 中删除将返回访问冲突
- C++ - 循环访问指针数组会导致错误
- 视觉 如何循环访问C++中遵循类似格式的多个.txt文件?
- 如何在C++中循环访问未知对象方法?
- 如何循环访问 cpp 中的函数返回的字符指针数组
- 在C++中循环访问自定义结构列表的小问题
- 有没有办法在C++中循环访问对象的不同数据成员
- 循环访问还包含未使用元素的字符串数组
- Pybind11:使用 for 循环使用 OpenMP 访问 python 对象
- 循环访问资源字符串表
- 在C++中循环访问类继承
- c++ 循环访问对象列表并删除对象
- 循环访问自定义双链表
- 循环访问对象列表 c++
- C++ 访问 if 语句(如果它们在 for 循环中)
- 从文件读取,并循环访问文件以写入数组
- 当我用"ñ"字符循环访问字符串时出现奇怪的结果
- 如何访问循环中结构的成员