转换和积累

Transform-and-Accumulate

本文关键字:转换      更新时间:2023-10-16

有没有人编写过一个符合 C++ STL 的算法,将std::transformstd::accumulate组合成一个单一的传递算法,支持一元、二进制甚至(n-ary!(变体,比如说std::transformed_accumulate?我想要这个,因为我发现这种模式在例如线性代数中高度可重用,例如在 (l1-(范数计算中。l1范数计算元素绝对值的总和。

嗯...我敢打赌,你可以通过将转换嵌入到二进制谓词中来做到这一点,转换元素并在转换后累积。

struct times2accumulator {
   int operator()( int oldvalue, int newvalue ) const {
      return oldvalue + 2*newvalue;
   }
};
int r = std::accumulate( v.begin(), v.end(), 2, times2accumulator() );

该函子等效于:

struct times2 {
   int operator()( int x ) {
      return 2*x;
   }
};
std::vector<int> tmp; tmp.reserve( v.size() );
std::transform( v.begin(), v.end(), std::back_inserter(tmp), times2 );
int r = std::accumulate( tmp.begin(), tmp.end(), 0 );

当然,这可以成为泛型的,只需将转换函子传递给泛型基函子:

template <typename Transform>
struct transform_accumulator_t {
    Transform t;
    transform_accumulator_t( Transform t ) : t(t) {}
    int operator()( int oldvalue, int newvalue ) const {
        return oldvalue + t(newvalue);
    }
};
// syntactic sugar:
template <typename T>
transform_accumulator_t<T> transform_accumulator( T t ) {
    return transform_accumulator_t<T>(t);
}
int r = std::accumulate(v.begin(), v.end(), 0, transform_accumulator(times2));

您还可以概括容器中的类型...或者甚至创建一个更通用的transform_accumulator,它同时采用累加器和转换函子并按顺序应用它们。实际实施留给读者作为练习。

虽然它可能不完全符合原始意图,但std::inner_product基本上是您的二进制版本。你给它传递一个初始值、两个范围和两个函子,它应用它们为:

T acc = initial_value;
while (begin1 != end1) {
    acc = binary_op1(acc, binary_op2(begin1, begin2);
    ++begin1;
    ++begin2;
return acc;

因此,对于您的 L1,您将按照以下一般顺序执行一些操作:

norm = std::inner_product(input1.begin(), input1.end(), 
                          input2.begin(), input2.end(), 
                          std::plus<int>(), std::abs);

只是这不太有效 - 现在,它试图传递std::abs你真正需要一个组合两个输入的二进制函数,但我不确定这两个输入应该如何组合。

std::partial_sum相当接近你的一元版本,除了在累积结果的同时,它(尝试(写出每个中间结果,而不仅仅是最终结果。为了得到最终结果,你必须编写(并传递一个实例(一种只保存单个值的无所事事的迭代器:

template<class T, class Dist=size_t, class Ptr = T*, class Ref = T&>
class unique_it : public std::iterator<std::random_access_iterator_tag, T, Dist, Ptr, Ref> { 
   T &value;
public:
   unique_it(T &v) : value(v) {}
   T &operator*() { return value; }
   unique_it &operator++() { return *this; }
   unique_it &operator+(size_t) { return *this; }
   unique_it &operator++(int) { return *this; }
};
template <class T>
unique_it<T> make_res(T &v) { return unique_it<T>(v); }

这样,您的 L1 规范化将如下所示:

int main(){ 
    double result=0.0;
    double inputs[] = {1, -2, 3, -4, 5, -6};
    std::partial_sum(
        inputs, inputs+6, 
        make_res(result),
        [](double acc, double v) {return acc + std::abs(v);});
    std::cout << result << "t";
    return 0;
}

截至 C++17 也有 std::transform_reduce ,这也具有可并行化的好处。

https://en.cppreference.com/w/cpp/algorithm/transform_reduce

如果你想使用一些并行性,我用OpenMP做了一个快速版本:

template <class T, 
          class InputIterator, 
          class MapFunction, 
          class ReductionFunction>
T MapReduce_n(InputIterator in, 
              unsigned int size, 
              T baseval, 
              MapFunction mapper, 
              ReductionFunction reducer)
{
    T val = baseval;
    #pragma omp parallel
    {
        T map_val = baseval;
        #pragma omp for nowait
        for (auto i = 0U; i < size; ++i)
        {
            map_val = reducer(map_val, mapper(*(in + i)));
        }
        #pragma omp critical
        val = reducer(val, map_val);
    }
    return val;
}

它很快,但肯定有优化的空间,尤其是在我认为for (auto i = 0U; i < size; ++i)左右。(但是我不知道如何使用 OpenMP 制作仅迭代器版本,任何帮助将不胜感激!

在对 1000000 个元素数组的快速测试中,计算迭代了 1000 次以获得平均值,我做了一些比较。

版本1 :

for (auto i = 0U; i < size; ++i)
    val += std::pow(in[i][0], 2) + std::pow(in[i][1], 2);

编译时的分数:

  • g++ : 30 秒
  • g++ -O3 : 2.6 秒

版本2 :

我认为这个版本是针对这种计算优化最多的版本。(它给出了最好的结果(。

#pragma omp parallel reduction( + : val )
{
    double map_val = 0.0;
    #pragma omp for
    for (int i=0; i < size; ++i)
    {
        map_val += std::pow(in[i][0], 2) + std::pow(in[i][1], 2);
    }
    val += map_val;
}
  • g++ -O3:0.2秒(这是最好的一个(

版本 3

此版本使用我之前展示的MapReduce_n函数模板:

double val = MapReduce_n(in, size, 0.0, [] (fftw_complex val)
    {
        return std::pow(val[0], 2.0) + std::pow(val[1], 2.0);
    }, std::plus<double>());
  • g++ -O3 :0.4秒,因此不直接使用OMP直接减少会产生轻微的开销。但是,它不允许自定义运算符,因此在某一时刻,您(可悲地(不得不以速度换取通用性。
我很

惊讶没有人说如何使用Boost.Range做到这一点:

accumulate(v | transformed((int(*)(int))&std::abs), 0);

其中 v 是单通道范围(即任何 STL 容器(。必须指定 abs 过载,否则这将像 Haskell 一样优雅。