对两个向量中的一对向量进行空间有效解包的优雅方法

Elegant way for space efficient unpacking of a vector of pairs in two vectors

本文关键字:向量 有效 空间 方法 两个      更新时间:2023-10-16

我有一个std::pair<std::string,size_t>元素的大向量,我想用小的额外内存开销(我不希望内存空间占用翻倍,即在解包后擦除对向量)将其解包为两个向量,并且尽可能快。下面的解决方案慢得令人无法接受:

std::vector<std::pair<std::string, size_t>> string_weight;
get_from_file("mybigfile.txt", string_weight); //it just fills the string_weight vector
//... do stuff...
std::vector<std::string> strings;
std::vector<size_t> weights;
for (auto it = string_weight.begin(); it != string_weight.end() ; it = string_weight.erase(it)) {
     strings.push_back(std::move(it->first));
     weights.push_back(std::move(it->second));
}

因此,我尝试修改之前的解决方案,只是以以下方式更改for循环:

for (auto it = string_weight.begin(), it2 = it; it != string_weight.end() ; it = string_weight.erase(it, it2)) {
        size_t delta = 100000;
        for ( it2 = it ; it2 != string_weight.end() && it2 != it+delta; it2++ ) {
            strings.push_back(std::move(it2->first));
            weights.push_back(std::move(it2->second));
        }
    }

这样更快,但完成时间与我为delta选择的值成正比,我不喜欢它。你能帮我给出一个解决方案或指出一些有用的技术吗?

提前谢谢你。

试试这个:

std::vector<std::string> strings;
std::vector<std::size_t> weights;
strings.reserve(string_weight.size());
weights.reserve(string_weight.size());
for (auto & p : string_weights)
{
     strings.push_back(std::move(p.first));
     weights.push_back(p.second);
}

一些变化:

  • 预构造权重向量:

    std::vector<std::size_t> weights(string_weight.size());
    // ...
    weights[i] = string_weights[i].second;
    

    这可能更好,因为它避免了重复的大小检查,但要花费初始的归零。(这可以通过使用原始动态数组或非构造分配器来避免。)

  • 预构造string vector:

    std::vector<std::string> strings(string_weight.size());
    // ...
    strings[i] = std::move(string_weights[i].first);
    // or
    strings[i].swap(string_weights[i].first);
    

    同样,这避免了重复的范围检查。

std::vector中删除元素不会释放任何内存(直接由容器本身拥有)。即使它释放了内存(这可以通过在erase()之后调用shrink_to_fit()来实现),仍然需要暂时(大约)翻倍的内存使用,因为重新分配需要发生类似于std::vector在增长时调整大小的方式-需要分配一个新的(稍微小一些的)内存块,元素复制到新区域,然后旧的分配将被释放。

因此(除非您可以用std::deque替换源std::vector),您应该忘记在此转换期间降低高水位内存使用。

擦除std::vector开头的元素是开销较大的操作。你可以使用3种可能来加速它:

  1. 使用std::deque代替std::vector对,它有0(1)删除元素在前面
  2. 清除循环后对向量

  3. 提前调整目标矢量大小并向后复制元素

的例子:

std::vector<std::string> strings( string_weight.size() );
auto tit1 = strings.rbegin();
std::vector<size_t> weights( string_weight.size() );
auto tit2 = weigths.rbegin();
for (auto it = string_weight.rbegin(); string_weight.size(); it++, string_weight.pop_back()) {
     *tit1++ = std::move(it->first);
     *tit2++ = std::move(it->second);
}

您应该首先尝试移动您的内容。通常std::string使用的大部分空间不在字符串本身。

所以你只需:

template<template<class...>class Tuple, class...Ts>
using vec_of_tup = std::vector<Tuple<Ts...>>;
template<template<class...>class Tuple, class...Ts>
using tup_of_vec = Tuple< std::vector<Ts>... >;
vec_of_tup<std::pair, std::string, std::size_t> in;
tup_of_vec<std::pair, std::string, std::size_t> out;
out.first.reserve(in.size());
out.second.reserve(in.size());
for(auto&& e:in) {
  out.first.push_back( std::move(e.first) );
  out.second.push_back( std::move(e.second) );
}
decltype(in){}.swap(in); // forced clear

这会使用更多的峰值内存,因为两个向量同时存在。但是用于字符串(超过一定短长度)的数据不会被双重分配,因为我们将它从一个容器移动到另一个容器。

只有字符串的"簿记"数据被保存了两次。

避免这种情况几乎是不可能的。压缩源向量使用的内存需要重新分配新大小的缓冲区。如果在将K个元素复制到目标向量之后执行此操作,则直接在这些向量中使用的内存是之前的N+K。您必须创建一个大小为(N-K)的新缓冲区,以便将元素复制到其中。所以你使用N+K+N-K = 2N内存。

如果您使用2N内存,则可以使用上述解决方案,避免不必要的副本。

你的代码似乎在字符串的"簿记"部分使用了大约2.8N内存,做了N^2/K个元素拷贝。简直糟透了。

可能你的问题是你使用std::vector来表示大得离谱的n。当你使用接近系统可用内存的向量大小时,std::vector的好处开始消失。

一种方法可能是实现一个具有控制块大小的deque,而不是默认的小块大小。假设每个块有10页内存。

现在,从前/后擦除是有效的,并且您的内存是相当连续的,每读取几千个元素就有一个页面错误,而不是纯基于节点的容器,每次读取元素都有一个页面错误。您可以移动容器的一部分而不会浪费内存,因为当您完成移动块时,它将被释放。

你的优化路线很复杂,找另一条。

这是一个初始峰值。

template<class T, std::size_t block_size_guess = 10*4096>
struct block_vector {
  template<class...Args>
  void emplace_back( Args&&...args ) {
    if (!last_block_used) {
      blocks.emplace_back();
    }
    new( (void*)get_ptr( size() ) ) T(std::forward<Args>(args)...);
    last_block_used = (last_block_used+1)%block_size;
  }
  template<class...Args>
  void emplace_front( Args&&...args ) {
    if (!first_block_unused) {
      blocks.emplace_front();
      first_block_unused = block_size;
    }
    --first_block_unused;
    new( (void*)get_ptr( 0 ) ) T(std::forward<Args>(args)...);
  }
  std::size_t size() const {
    if (last_block_used) // if zero, it means the last block is full
      return blocks.size() * block_size - first_block_unused + last_block_used - block_size;
    else
      return blocks.size() * block_size - first_block_unused;
  }
  T& operator[]( std::size_t i ) { return *get_ptr(i); }
  T const& operator[]( std::size_t i ) const { return *get_ptr(i); }
  // todo: iterators, front(), back(), erase( it, it ), erase( it ), etc.
private:
  enum {
    block_calc = block_size_guess/sizeof(T),
    block_size = block_calc?block_calc:1,
  };
  using raw_block = std::array< std::array<unsigned char, sizeof(T)>, block_size >;
  std::deque<raw_block> blocks;
  std::size_t first_block_unused = 0;
  std::size_t last_block_used = 0;
  using block = std::array< T, block_size >;
  block& get_block( std::size_t b ) {
    return reinterpret_cast<block&>(blocks[b]);
  }
  block const& get_block( std::size_t b ) const {
    return reinterpret_cast<block const&>(blocks[b]);
  }
  static std::size_t outer( std::size_t i ) { return (i+first_block_unused)/block_size; }
  static std::size_t inner( std::size_t i ) { return (i+first_block_unused)%block-size; }
  T* get_ptr( std::size_t i ) {
    return std::addressof( get_block( outer(i) )[ inner(i) ] );
  }
  T const* get_ptr( std::size_t i ) const {
    return std::addressof( get_block( outer(i) )[ inner(i) ] );
  }
};

或者创建新的矢量,您可以创建视图。对于range-v3,你可以这样做:

const auto strings = string_weights | ranges::view::keys;
const auto weights = string_weights | ranges::view::values;
演示

很抱歉在这里做一个不祥的预兆,但是,

如果内存有限,您需要:

  1. 批量处理文件,或
  2. 如果可能,将文件视为随机访问的数据存储。

除非您在目标向量中reserve内存,否则它们可能会在填充时调整几次大小。这不仅会使过度分配内存,而且还会导致碎片化——连续内存分配的敌人(std::vector依赖于此,由标准强制规定)。

此外,一旦初始向量被填充(即使您通过保留适当的空间有效地做到了这一点),它将永远不会在不调用shrink_to_fit的情况下收缩。即便如此,也不能保证它会萎缩。更糟糕的是,如果这样做,它将在操作期间消耗更多内存。

如果不是c++中的所有抽象级别,您可以就地完成

当然,对于c++中std::vector所拥有的内存来说,这些都是不可能的。即使在汇编中,您也需要一种方法来告诉内存分配器,您的一个大分配现在是两个小分配。如果你想让能够在c++中做到这一点,你可能会使用数组的联合或其他东西,或者放置new来重新利用从分配器获得的原始内存。

我发这篇文章只是为了说明抽象层次让高效代码变得不可能是多么糟糕。(例如,c++的new/delete甚至没有try_realloc,可以让vector 尝试原地增长或收缩;所有主流的c++实现都有一个std::vector,它总是只是分配和复制,甚至没有尝试查看在增长时是否有与现有分配相邻的空闲地址空间。)IDK为什么ISO c++仍然没有添加任何可以允许有效内存管理的东西(除了std::malloc/std::realloc不兼容new/delete)。

如果你想在不写入新元素的情况下增长std::vector(因为你想把read()写入内存或其他什么),你需要一个使用自定义分配器的vector来不构造元素。


std::string可以是std::move,这可以归结为复制对象表示。size_t是可复制的。

给定这对对象的大小比例,计算第一个输出数组末尾的字节偏移量。例如,假设32字节的std::string(3个指针加上内联小字符串的额外空间,如libstdc++在x86-64使用)和8字节的size_t,比例为4:1。假设对齐不需要在每对中额外填充,这意味着源向量有40字节的元素。

所以对于10M元素的源数组(400M字节),字符串数组将是10M元素,320M字节,size_t数组将是80M字节。

完成后,字符串数组将是pair数组的第一个320MB, size_t数组将是下一个80MB。我们可以提前计算出指向该目标的指针。位置也是在覆盖该内存之前需要读取的一对的开始

我将首先解释一对对象的1:1大小比例

  • 加载2对,一个从整体开始,另一个从第二个数组开始的偏移量。(即tmp0 = vec[pos0]; tmp1 = vec[pos1])
  • 将每个tmp对的第一个成员存储到第一个tmp对所在的空间。
  • 将每个tmp对的第二个成员存储到第二个成员所在的空间中。

重复直到完成,增加位置。为此分配或接触任何新内存都是对CPU周期的完全浪费,并且会造成不必要的内存压力。在具有SIMD的cpu上,这些加载和迁移可以很好地矢量化,并且有望以接近内存的速度运行,并且完全受益于目标在缓存中的热状态,而不是复制到新的冷内存。

对于尺寸不均匀的双,您需要从开始加载4双,每从中间加载1双。4 × 40 = 5 × 32,所以我们可以把新数据放回去,而不覆盖我们还没有读到的内容。在高的部分,1x40b = 5x8b。

在c++中,你可以使用本地tmp数组,编译器仍然可以合理地将其优化为寄存器,以适应较小的数组大小。


或者,首先不要创建一个巨大的对向量。从中等大小的块中读取文件(如8kiB是一个很好的大小,适合L1d缓存),并将行解析为2个向量。