用于批处理分配的库

Library for batching allocation

本文关键字:分配 批处理 用于      更新时间:2023-10-16

所以我目前正在重构一个巨大的函数:

int giant_function(size_t n, size_t m, /*... other parameters */) {
int x[n]{};
float y[n]{};
int z[m]{};
/* ... more array definitions */

当我找到一组具有离散功能的相关定义时,将它们分组到一个类定义中:

class V0 {
std::unique_ptr<int[]> x;
std::unique_ptr<float[]> y;
std::unique_ptr<int[]> z;
public:
V0(size_t n, size_t m)
: x{new int[n]{}}
, y{new float[n]{}}
, z{new int[m]{}}
{}
// methods...
}

重构后的版本不可避免地更具可读性,但我觉得不太令人满意的一件事是分配数量的增加。

在堆栈上分配所有这些(可能非常大的)数组可以说是一个等待在未重构版本中发生的问题,但是我们没有理由不能只通过一个更大的分配:

class V1 {
int* x;
float* y;
int* z;
public:
V1(size_t n, size_t m) {
char *buf = new char[n*sizeof(int)+n*sizeof(float)+m*sizeof(int)];
x = (int*) buf;
buf += n*sizeof(int);
y = (float*) buf;
buf += n*sizeof(float);
z = (int*) buf;
}
// methods...
~V0() { delete[] ((char *) x); }
}

这种方法不仅涉及大量的手动(阅读:容易出错)簿记,而且它更大的罪过是它不可组合。

如果我想在堆栈上有一个V1值和一个W1值,那就是 每人分配一个用于其幕后资源。 更简单的是,我希望能够在单个分配中分配V1及其指向的资源,而这种方法无法做到这一点。

这最初导致我采用双传递方法 - 一次传递计算需要多少空间,然后进行一次巨大的分配,然后进行另一次传递以分配并初始化数据结构。

class V2 {
int* x;
float* y;
int* z;
public:
static size_t size(size_t n, size_t m) {
return sizeof(V2) + n*sizeof(int) + n*sizeof(float) + m*sizeof(int);
}
V2(size_t n, size_t m, char** buf) {
x = (int*) *buf;
*buf += n*sizeof(int);
y = (float*) *buf;
*buf += n*sizeof(float);
z = (int*) *buf;
*buf += m*sizeof(int);
}
}  
// ...
size_t total = ... + V2::size(n,m) + ...
char* buf = new char[total];
// ...
void*  here = buf;
buf += sizeof(V2);
V2* v2 = new (here) V2{n, m, &buf};

然而,这种方法在远处有很多重复,从长远来看是自找麻烦。返回工厂摆脱了它:

class V3 {
int* const x;
float* const y;
int* const z;
V3(int* x, float* y, int* z) : x{x}, y{y}, z{z} {}
public:
class V3Factory {
size_t const n;
size_t const m;
public:
Factory(size_t n, size_t m) : n{n}, m{m};
size_t size() {
return sizeof(V3) + sizeof(int)*n + sizeof(float)*n + sizeof(int)*m;
}
V3* build(char** buf) {
void * here = *buf;
*buf += sizeof(V3);
x = (int*) *buf;
*buf += n*sizeof(int);
y = (float*) *buf;
*buf += n*sizeof(float);
z = (int*) *buf;
*buf += m*sizeof(int);
return new (here) V3{x,y,z};
}
}
}
// ...
V3::Factory v3factory{n,m};
// ...
size_t total = ... + v3factory.size() + ...
char* buf = new char[total];
// ..
V3* v3 = v3factory.build(&buf);

仍然有一些重复,但参数只得到一次输入。而且仍然有很多手动簿记。如果我能用较小的工厂建造这个工厂,那就太好了......

然后我的哈斯克尔大脑击中了我。我正在实现一个应用函子。 这完全可以更好!

我需要做的就是编写一些工具来自动对大小求和并并行运行构建函数:

namespace plan {
template <typename A, typename B>
struct Apply {
A const a;
B const b;
Apply(A const a, B const b) : a{a}, b{b} {};
template<typename ... Args>
auto build(char* buf, Args ... args) const {
return a.build(buf, b.build(buf + a.size()), args...);
}
size_t size() const {
return a.size() + b.size();
}
Apply(Apply<A,B> const & plan) : a{plan.a}, b{plan.b} {}
Apply(Apply<A,B> const && plan) : a{plan.a}, b{plan.b} {}
template<typename U, typename ... Vs>
auto operator()(U const u, Vs const ... vs) const {
return Apply<decltype(*this),U>{*this,u}(vs...);
}
auto operator()() const {
return *this;
}
};
template<typename T>
struct Lift {
template<typename ... Args>
T* build(char* buf, Args ... args) const {
return new (buf) T{args...};
}
size_t size() const {
return sizeof(T);
}
Lift() {}
Lift(Lift<T> const &) {}
Lift(Lift<T> const &&) {}
template<typename U, typename ... Vs>
auto operator()(U const u, Vs const ... vs) const {
return Apply<decltype(*this),U>{*this,u}(vs...);
}
auto operator()() const {
return *this;
}
}; 
template<typename T>
struct Array {
size_t const length;
Array(size_t length) : length{length} {}
T* build(char* buf) const {
return new (buf) T[length]{};
}
size_t size() const {
return sizeof(T[length]);
}
};
template <typename P>
auto heap_allocate(P plan) {
return plan.build(new char[plan.size()]);
}
}

现在我可以非常简单地陈述我的类:

class V4 {
int* const x;
float* const y;
int* const z;
public:
V4(int* x, float* y, int* z) : x{x}, y{y}, z{z} {}
static auto plan(size_t n, size_t m) {
return plan::Lift<V4>{}(
plan::Array<int>{n},
plan::Array<float>{n},
plan::Array<int>{m}
);
}
};

并在一次通过中使用它:

V4* v4;
W4* w4;
std::tie{ ..., v4, w4, .... } = *plan::heap_allocate(
plan::Lift<std::tie>{}(
// ...
V4::plan(n,m),
W4::plan(m,p,2*m+1),
// ...
)
);

它并不完美(除其他问题外,我需要添加代码来跟踪析构函数,并heap_allocate返回一个调用所有析构函数的std::unique_ptr),但在我进一步深入兔子洞之前,我认为我应该检查一下预先存在的艺术。

据我所知,现代编译器可能足够聪明,可以识别V0中的内存总是一起分配/解除分配,并为我批量分配。

如果没有,是否有这个想法的预先实现(或其变体)来使用应用函子批处理分配?

首先,我想就您的解决方案的问题提供反馈:

  1. 您忽略对齐。依靠假设intfloat在您的系统上共享相同的对齐方式,您的特定用例可能"很好"。但是尝试在组合中添加一些double,就会有 UB。您可能会发现您的程序由于未对齐的访问而在 ARM 芯片上崩溃。

  2. 不幸的是,new (buf) T[length]{};是糟糕且不可移植的。简而言之:标准允许编译器保留给定存储的初始y字节供内部使用。您的程序无法在y > 0的系统上分配这y个字节(是的,这些系统显然存在;据称VC++这样做了)。

    必须为y分配是不好的,但是使数组放置-new不可用的原因是,在实际调用放置new之前,无法找出y有多大。在这种情况下,真的没有办法使用它。

  3. 已经意识到这一点,但为了完整起见:您不会破坏子缓冲区,因此如果您使用非平凡可破坏的类型,那么就会有 UB。

>解决方案:
  1. 为每个缓冲区分配额外的alignof(T) - 1字节。将每个缓冲区的开头与std::align对齐。

  2. 您需要循环并使用非数组放置新。从技术上讲,执行非数组放置新意味着在这些对象上使用指针算术具有 UB,但标准在这方面只是愚蠢的,我选择忽略它。这是关于这个问题的语言律师讨论。据我了解,p0593r2提案包括针对此技术性的解决方案。

  3. 添加与放置新调用对应的析构函数调用(或仅应使用简单可破坏类型的static_assert)。请注意,对非平凡销毁的支持提出了对异常安全的需求。如果构造一个缓冲区引发异常,则需要销毁之前构造的子缓冲区。当单个元素的构造函数在已经构造了一些元素之后抛出时,也需要同样小心。


我不知道现有技术,但是一些后续技术呢?我决定从一个稍微不同的角度尝试一下。但请注意,这缺乏测试并且可能包含错误。

buffer_clump模板,用于将对象构造/销毁到外部原始存储中,并计算每个子缓冲区的对齐边界:

#include <cstddef>
#include <memory>
#include <vector>
#include <tuple>
#include <cassert>
#include <type_traits>
#include <utility>
// recursion base
template <class... Args>
class buffer_clump {
protected:
constexpr std::size_t buffer_size() const noexcept { return 0; }
constexpr std::tuple<> buffers(char*) const noexcept { return {}; }
constexpr void construct(char*) const noexcept { }
constexpr void destroy(const char*) const noexcept {}
};
template<class Head, class... Tail>
class buffer_clump<Head, Tail...> : buffer_clump<Tail...> {
using tail = buffer_clump<Tail...>;
const std::size_t length;

constexpr std::size_t size() const noexcept
{
return sizeof(Head) * length + alignof(Head) - 1;
}

constexpr Head* align(char* buf) const noexcept
{
void* aligned = buf;
std::size_t space = size();
assert(std::align(
alignof(Head),
sizeof(Head) * length,
aligned,
space
));
return (Head*)aligned;
}

constexpr char* next(char* buf) const noexcept
{
return buf + size();
}

static constexpr void
destroy_head(Head* head_ptr, std::size_t last)
noexcept(std::is_nothrow_destructible<Head>::value)
{
if constexpr (!std::is_trivially_destructible<Head>::value)
while (last--)
head_ptr[last].~Head();
}

public:
template<class... Size_t>
constexpr buffer_clump(std::size_t length, Size_t... tail_lengths) noexcept
: tail(tail_lengths...), length(length) {}

constexpr std::size_t
buffer_size() const noexcept
{
return size() + tail::buffer_size();
}

constexpr auto
buffers(char* buf) const noexcept
{
return std::tuple_cat(
std::make_tuple(align(buf)), 
tail::buffers(next(buf))
);
}

void
construct(char* buf) const
noexcept(std::is_nothrow_default_constructible<Head, Tail...>::value)
{
Head* aligned = align(buf);
std::size_t i;
try {
for (i = 0; i < length; i++)
new (&aligned[i]) Head;
tail::construct(next(buf));
} catch (...) {
destroy_head(aligned, i);
throw;
}
}

constexpr void
destroy(char* buf) const
noexcept(std::is_nothrow_destructible<Head, Tail...>::value)
{
tail::destroy(next(buf));
destroy_head(align(buf), length);
}
};

一个buffer_clump_storage模板,利用buffer_clump将子缓冲区构造到 RAII 容器中。

template <class... Args>
class buffer_clump_storage {
const buffer_clump<Args...> clump;
std::vector<char> storage;

public:
constexpr auto buffers() noexcept {
return clump.buffers(storage.data());
}

template<class... Size_t>
buffer_clump_storage(Size_t... lengths)
: clump(lengths...), storage(clump.buffer_size())
{
clump.construct(storage.data());
}

~buffer_clump_storage()
noexcept(noexcept(clump.destroy(nullptr)))
{
if (storage.size())
clump.destroy(storage.data());
}
buffer_clump_storage(buffer_clump_storage&& other) noexcept
: clump(other.clump), storage(std::move(other.storage))
{
other.storage.clear();
}
};

最后,一个可以分配为自动变量并提供指向buffer_clump_storage子缓冲区的命名指针的类:

class V5 {
// macro tricks or boost mpl magic could be used to avoid repetitive boilerplate
buffer_clump_storage<int, float, int> storage;

public:
int* x;
float* y;
int* z;
V5(std::size_t xs, std::size_t  ys, std::size_t zs)
: storage(xs, ys, zs)
{
std::tie(x, y, z) = storage.buffers();
}
};
<小时 />

和用法:

int giant_function(size_t n, size_t m, /*... other parameters */) {
V5 v(n, n, m);
for(std::size_t i = 0; i < n; i++)
v.x[i] = i;

如果您只需要聚集的分配,而不需要命名组的功能,则这种直接用法几乎避免了所有样板:

int giant_function(size_t n, size_t m, /*... other parameters */) {
buffer_clump_storage<int, float, int> v(n, n, m);
auto [x, y, z] = v.buffers();

对我自己工作的批评:

  • 我懒得让V5成员const可以说是很好的,但我发现它涉及的样板比我想要的要多。
  • 编译器将警告函数中存在一个throw,当构造函数无法抛出时,该被声明为noexcept。g++ 和 clang++ 都不够聪明,无法理解当函数noexcept时永远不会发生抛出。我想这可以通过使用部分专用化来解决,或者我可以添加(非标准)指令来禁用警告。
  • buffer_clump_storage可以复制和分配。这涉及加载更多代码,我不希望需要它们。移动构造函数也可能是多余的,但至少它是高效和简洁的实现。