需要更快地计算(近似)方差

Faster computation of (approximate) variance needed

本文关键字:近似 方差 计算      更新时间:2023-10-16

我可以通过 CPU 分析器看到,compute_variances()是我项目的瓶颈。

  %   cumulative   self              self     total           
 time   seconds   seconds    calls  ms/call  ms/call  name    
 75.63      5.43     5.43       40   135.75   135.75  compute_variances(unsigned int, std::vector<Point, std::allocator<Point> > const&, float*, float*, unsigned int*)
 19.08      6.80     1.37                             readDivisionSpace(Division_Euclidean_space&, char*)
 ...

下面是函数的主体:

void compute_variances(size_t t, const std::vector<Point>& points, float* avg,
                       float* var, size_t* split_dims) {
  for (size_t d = 0; d < points[0].dim(); d++) {
    avg[d] = 0.0;
    var[d] = 0.0;
  }
  float delta, n;
  for (size_t i = 0; i < points.size(); ++i) {
    n = 1.0 + i;
    for (size_t d = 0; d < points[0].dim(); ++d) {
      delta = (points[i][d]) - avg[d];
      avg[d] += delta / n;
      var[d] += delta * ((points[i][d]) - avg[d]);
    }
  }
  /* Find t dimensions with largest scaled variance. */
  kthLargest(var, points[0].dim(), t, split_dims);
}

kthLargest()似乎不是问题,因为我看到:

0.00 7.18 0.00 40 0.00 0.00 kthLargest(float*, int, int, unsigned int*)

compute_variances()采用浮点数向量的向量(即Points向量,其中Points是我实现的类(,并计算它们在每个维度上的方差(关于高德纳的算法(。

以下是我调用函数的方式:

float avg[(*points)[0].dim()];
float var[(*points)[0].dim()];
size_t split_dims[t];
compute_variances(t, *points, avg, var, split_dims);

问题是,我能做得更好吗?我真的很乐意在速度和方差的近似计算之间进行权衡。或者,也许我可以使代码对缓存更友好之类的?

我是这样编译的:

g++ main_noTime.cpp -std=c++0x -p -pg -O3 -o eg

请注意,在编辑之前,我使用了-o3,而不是大写的"o"。感谢 ypnos,我现在使用优化标志 -O3 进行编译。我确信它们之间存在差异,因为我在我的伪站点中使用这些方法之一进行了时间测量。

请注意,现在,compute_variances主导了整个项目的时间!

[编辑]

copute_variances()被调用 40 次。

每 10 次调用,以下情况成立:

points.size() = 1000   and points[0].dim = 10000
points.size() = 10000  and points[0].dim = 100
points.size() = 10000  and points[0].dim = 10000
points.size() = 100000 and points[0].dim = 100

每个调用处理不同的数据。

问:访问points[i][d]的速度有多快?

答:point[i]只是 std::vector 的第 i 个元素,其中第二个[]Point 类中实现为这样。

const FT& operator [](const int i) const {
  if (i < (int) coords.size() && i >= 0)
     return coords.at(i);
  else {
     std::cout << "Error at Point::[]" << std::endl;
     exit(1);
  }
  return coords[0];  // Clear -Wall warning 
}

其中coordsfloat值的std::vector。这似乎有点沉重,但编译器不应该足够聪明来正确预测分支总是正确的吗?(我的意思是冷启动后(。此外,std::vector.at()应该是恒定时间(如参考中所述(。我将其更改为在函数主体中仅具有.at(),并且时间测量值几乎保持不变。

compute_variances()分裂肯定是沉重的!然而,高德纳的算法是一个数值稳定的算法,我找不到另一种算法,它既能数值稳定又不除法。

请注意,我现在对并行性感兴趣。

[编辑2]

Point类的最小示例(我想我没有忘记展示一些东西(:

class Point {
 public:
  typedef float FT;
  ...
  /**
   * Get dimension of point.
   */
  size_t dim() const {
    return coords.size();
  }
  /**
   * Operator that returns the coordinate at the given index.
   * @param i - index of the coordinate
   * @return the coordinate at index i
   */
  FT& operator [](const int i) {
    return coords.at(i);
    //it's the same if I have the commented code below
    /*if (i < (int) coords.size() && i >= 0)
      return coords.at(i);
    else {
      std::cout << "Error at Point::[]" << std::endl;
      exit(1);
    }
    return coords[0];  // Clear -Wall warning*/
  }
  /**
   * Operator that returns the coordinate at the given index. (constant)
   * @param i - index of the coordinate
   * @return the coordinate at index i
   */
  const FT& operator [](const int i) const {
        return coords.at(i);
    /*if (i < (int) coords.size() && i >= 0)
      return coords.at(i);
    else {
      std::cout << "Error at Point::[]" << std::endl;
      exit(1);
    }
    return coords[0];  // Clear -Wall warning*/
  }
 private:
  std::vector<FT> coords;
};

1.单程

一个简单的加速是使用矢量指令 (SIMD( 进行计算。在x86上,这意味着SSE,AVX指令。根据您的字长和处理器,您可以获得大约 x4 甚至更高的加速。此代码在此处:

for (size_t d = 0; d < points[0].dim(); ++d) {
    delta = (points[i][d]) - avg[d];
    avg[d] += delta / n;
    var[d] += delta * ((points[i][d]) - avg[d]);
}

可以通过使用 SSE 同时计算四个元素来加快速度。由于您的代码实际上只在每次循环迭代中处理一个元素,因此没有瓶颈。如果你下降到16位短而不是32位浮点数(那么近似值(,你可以在一条指令中容纳八个元素。使用AVX,它会更多,但是您需要一个最新的处理器。

它不是性能问题的解决方案,而只是其中之一,也可以与其他问题结合使用。

2. 微并行

当你有这么多循环时,第二个简单的加速是使用并行处理。我通常使用英特尔 TBB,其他人可能会建议使用 OpenMP。为此,您可能需要更改循环顺序。因此,在外循环中并行化 d,而不是在 i 上并行。

您可以结合这两种技术,如果您做得对,在具有HT的四核上,您可能会获得25-30的组合速度,而不会损失任何准确性。

3. 编译器优化

首先,也许它只是SO上的一个错字,但它需要-O3,而不是-o3!作为一般说明,如果在实际使用它们的范围内声明变量 delta,n 可能会更容易,编译器可以更轻松地优化代码。您还应该尝试-funroll-loops编译器选项以及-march 。后者的选项取决于您的CPU,但现在通常-march core2很好(也适用于最近的AMD(,并且包括SSE优化(但我不相信编译器尚未为您的循环执行此操作(。

数据结构的最大问题是它本质上是一个vector<vector<float> >。 这是一个指向float数组的指针数组的指针,并附加了一些花里胡哨的东西。 特别是,访问vector中的连续Point并不对应于访问连续的内存位置。 我敢打赌,当您分析此代码时,您会看到大量缓存未命中。

在进行其他任何操作之前,请先解决此问题。

低阶问题包括内部循环中的浮点除法(计算1/n在外部循环中(和作为内部循环的大型负载存储链。 例如,您可以使用 SIMD 计算阵列切片的均值和方差,并在最后将它们组合在一起。

每次访问一次边界检查可能也无济于事。 也摆脱它,或者至少把它从内循环中吊起来;不要假设编译器知道如何自行修复它。

以下是我会做的,按猜测的重要性顺序排列:

  1. 值(而不是按引用(从Point::operator[]返回浮点数。
  2. 使用 coords[i] 而不是 coords.at(i) ,因为您已经断言它在范围内。at成员检查边界。您只需要检查一次。
  3. Point::operator[]中的自制错误指示/检查替换为断言。这就是断言的目的。它们在发布模式下名义上是无操作的 - 我怀疑您是否需要在发布代码中检查它。
  4. 将重复除法替换为单个除法和重复乘法。
  5. 通过展开外部循环的前两次迭代来消除浪费初始化的需要。
  6. 为了减少缓存未命中的影响,请交替运行内部循环,先向前然后向后运行。这至少让您有机会使用一些缓存的avgvar。实际上,它可以删除avg上的所有缓存未命中,并var预取是否以相反的迭代顺序工作,因为它也应该这样做。
  7. 在现代C++编译器上,std::fillstd::copy可以利用类型对齐,并有机会比 C 库memsetmemcpy更快。

Point::operator[]将有机会在发布版本中内联,并且可以减少到两个机器指令(有效地址计算和浮点加载(。这就是你想要的。当然,它必须在头文件中定义,否则只有在启用链接时代码生成(也称为 LTO(时才会执行内联。

注意Point::operator[]的主体只相当于单行 return coords.at(i)在调试版本中。在发布版本中,整个主体相当于return coords[i]而不是return coords.at(i)

FT Point::operator[](int i) const {
  assert(i >= 0 && i < (int)coords.size());
  return coords[i];
}
const FT * Point::constData() const {
  return &coords[0];
}
void compute_variances(size_t t, const std::vector<Point>& points, float* avg,
                       float* var, size_t* split_dims)
{
  assert(points.size() > 0);
  const int D = points[0].dim();
  // i = 0, i_n = 1
  assert(D > 0);
#if __cplusplus >= 201103L
  std::copy_n(points[0].constData(), D, avg);
#else
  std::copy(points[0].constData(), points[0].constData() + D, avg);
#endif
  // i = 1, i_n = 0.5
  if (points.size() >= 2) {
    assert(points[1].dim() == D);
    for (int d = D - 1; d >= 0; --d) {
      float const delta = points[1][d] - avg[d];
      avg[d] += delta * 0.5f;
      var[d] = delta * (points[1][d] - avg[d]);
    }
  } else {
    std::fill_n(var, D, 0.0f);
  }
  // i = 2, ...
  for (size_t i = 2; i < points.size(); ) {
    {
      const float i_n = 1.0f / (1.0f + i);
      assert(points[i].dim() == D);
      for (int d = 0; d < D; ++d) {
        float const delta = points[i][d] - avg[d];
        avg[d] += delta * i_n;
        var[d] += delta * (points[i][d] - avg[d]);
      }
    }
    ++ i;
    if (i >= points.size()) break;
    {
      const float i_n = 1.0f / (1.0f + i);
      assert(points[i].dim() == D);      
      for (int d = D - 1; d >= 0; --d) {
        float const delta = points[i][d] - avg[d];
        avg[d] += delta * i_n;
        var[d] += delta * (points[i][d] - avg[d]);
      }
    }
    ++ i;
  }
  /* Find t dimensions with largest scaled variance. */
  kthLargest(var, D, t, split_dims);
}
for (size_t d = 0; d < points[0].dim(); d++) {
  avg[d] = 0.0;
  var[d] = 0.0;
}

此代码可以通过简单地使用 memset 进行优化。0.0 在 32 位中的IEEE754表示形式是0x00000000。如果尺寸很大,那就值得了。像这样:

memset((void*)avg, 0, points[0].dim() * sizeof(float));

在你的代码中,你有很多对points[0].dim((的调用。最好在函数开头调用一次并存储在变量中。很可能,编译器已经这样做了(因为你使用的是 -O3(。

除法运算(从时钟周期 POV 开始(比其他运算(加法、减法(昂贵得多。

avg[d] += delta / n;

尝试减少除法数是有意义的:使用部分非累积平均计算,这将导致 N 个元素的 Dim 除法操作(而不是 N x Dim(;N <points.size((>

使用

Cuda 或 OpenCL 可以实现巨大的加速,因为每个维度可以同时计算 avg 和 var(考虑使用 GPU(。

另一个优化是缓存优化,包括数据缓存指令缓存

高级优化技术
数据缓存优化

数据缓存优化和展开示例

for (size_t d = 0; d < points[0].dim(); d += 4)
{
  // Perform loading all at once.
  register const float p1 = points[i][d + 0];
  register const float p2 = points[i][d + 1];
  register const float p3 = points[i][d + 2];
  register const float p4 = points[i][d + 3];
  register const float delta1 = p1 - avg[d+0];
  register const float delta2 = p2 - avg[d+1];
  register const float delta3 = p3 - avg[d+2];
  register const float delta4 = p4 - avg[d+3];
  // Perform calculations
  avg[d + 0] += delta1 / n;
  var[d + 0] += delta1 * ((p1) - avg[d + 0]);
  avg[d + 1] += delta2 / n;
  var[d + 1] += delta2 * ((p2) - avg[d + 1]);
  avg[d + 2] += delta3 / n;
  var[d + 2] += delta3 * ((p3) - avg[d + 2]);
  avg[d + 3] += delta4 / n;
  var[d + 3] += delta4 * ((p4) - avg[d + 3]);
}

这与经典循环展开的不同之处在于,从矩阵加载是在循环顶部作为一组执行的。

编辑 1:
微妙的数据优化是将avgvar放入结构中。 这将确保两个数组在内存中彼此相邻,没有填充。 处理器中的数据获取机制,例如彼此非常接近的数据。 数据缓存未命中的可能性更小,将所有数据加载到缓存中的机会更大。

您可以使用定点数学而不是浮点数学作为优化。

通过定点
优化处理器喜欢操作整数(有符号或无符号(。 浮点可能需要额外的计算能力,因为提取零件,执行数学运算,然后重新组装零件。 一种缓解措施是使用定点数学。

简单示例:仪表
给定米的单位,可以使用浮点数表示小于一米的长度,例如 3.14159 m。 但是,相同的长度可以用毫米等更精细的细节单位表示,例如 3141.59 毫米。 为了获得更精细的分辨率,请选择较小的单位并将值相乘,例如 3,141,590 um(微米(。 点是选择一个足够小的单位来将浮点精度表示为整数。

浮点值在输入时转换为定点。 所有数据处理均在定点进行。 定点值在输出之前转换为浮点。

2 个固定点基座
的功率与从浮点米转换为定点毫米一样,使用 1000,可以使用 2 的幂而不是 1000。 选择 2 的幂允许处理器使用位移而不是乘法或除法。 位移 2 的幂通常比乘法或除法快。

为了保持毫米的主题和精度,我们可以使用 1024 而不是 1000 作为基础。 同样,为了获得更高的精度,请使用 65536 或 131072。

总结
将设计或实现更改为使用的定点数学允许处理器使用比浮点运算更完整的数据处理指令。 浮点运算比除专用处理器之外的所有处理器中的积分运算消耗更多的处理能力。 使用 2 的幂作为基数(或分母(允许代码使用位移而不是乘法或除法。 除法和乘法比移位需要更多的操作,因此移位更快。 因此,与其优化执行代码(例如循环展开(,不如尝试使用定点表示法而不是浮点数。

第 1 点。您同时计算平均值和方差。是吗?你不是要先计算平均值,然后一旦你知道了,就计算平均值的平方差之和吗?除了正确之外,它更有可能帮助性能而不是伤害它。尝试在一个循环中做两件事不一定比两个连续的简单循环快。

第2点。您是否知道有一种方法可以同时计算平均值和方差,如下所示:

double sumsq = 0, sum = 0;
for (i = 0; i < n; i++){
  double xi = x[i];
  sum += xi;
  sumsq += xi * xi;
}
double avg = sum / n;
double avgsq = sumsq / n
double variance = avgsq - avg*avg;

第3点。内部循环正在执行重复索引。编译器也许能够将其优化到最小值,但我不会把我的袜子押在上面。

第4点。您正在使用gprof或类似的东西。唯一合理可靠的数字是按功能划分的自时。它不会很好地告诉您如何在函数中花费时间。我和许多其他人都依赖这种方法,它直接带您进入需要时间的核心。