在指向 2D 数据的指针上C++"std::sort"而不复制

C++ `std::sort` over pointer to 2D data without copying

本文关键字:quot std sort 复制 C++ 2D 数据 指针      更新时间:2023-10-16

我在 C 风格的 2D 数据数组中有大量数据(GiB 到 TiB)。它不是数组的数组,而是指向解释为 2D 数据的指针。它非常大,所以我不想将其复制到std::vectors 或类似内容。我无法控制数据源,它来自外部库。

我需要根据列中的数据std::sort数据的行(不完全是lex排序,但类似的概念)。

我已经弄清楚了如何使用编译时已知数量的列来做到这一点。例如:

#define COLUMNS 4
struct Row {
double values[COLUMNS];
};
double* data = ...;
size_t n_rows = ...;
size_t n_cols = COLUMNS;
std::sort((Row*)data, ((Row*)data)+n_rows, comp);

我知道我可以为COLUMNS模板结构而不是使用宏,而不是使用comp可以定义比在Row结构中operator<,但这不会改变列数的编译时性质*。

我能想到的唯一解决方案是使用知道每行步幅的自定义随机访问迭代器。但是在我制作自己的迭代器之前(这对我来说总是有点令人生畏),我想确保没有其他方法。

*做出这些设计选择是因为我实际上是在 Cython 而不是 C++ 中编写的,但这无关紧要,我不知道如何在没有自定义迭代器的情况下使用 C++ 做到这一点。我愿意用C++编写解决方案,但更喜欢可以编写的选项 Cython(我可以转换)。

示例代码,显示以下 O(n) 时间内的重新排序。您需要更改pa[i]-a,它将指针转换为索引以处理 a[] 的实际结构。

#include <algorithm>
#include <iostream>
bool compare(const double *p0, const double *p1)
{
return *p0 < *p1;
}
int main()
{
double a[8] = {8.0,6.0,1.0,7.0,5.0,3.0,4.0,2.0};
double *pa[8];
size_t i, j, k;
double ta;
// create array of pointers to a[]
for(i = 0; i < sizeof(a)/sizeof(a[0]); i++)
pa[i] = &a[i];
// sort array of pointers to a[]
std::sort(pa, pa+sizeof(a)/sizeof(a[0]), compare);
// reorder a[] and pa[] according to pa[] in O(n) time
for(i = 0; i < sizeof(a)/sizeof(a[0]); i++){
if(i != pa[i]-a){
ta = a[i];
k = i;
while(i != (j = pa[k]-a)){
a[k] = a[j];
pa[k] = &a[k];
k = j;
}
a[k] = ta;
pa[k] = &a[k];
}
}
for(i = 0; i < sizeof(a)/sizeof(a[0]); i++)
std::cout << a[i] << ' ';
std::cout << std::endl;
return 0;
}

就地重新排序的工作原理是撤消根据 a[] 排序的 pa[] 中的"循环"。对于此示例代码,索引列表 0 到 7 后跟 i = 0 到 7 的 pa[i]-a 列表,结果为:

0 1 2 3 4 5 6 7    (i)
2 7 5 6 4 1 3 0    (pa[i] - a)

这显示了 pa[] 中根据 a[] 排序的"循环"。从 (i) 行中的 0 开始,它下面的索引是 2。查看 i 行中的 2,它下面的数字是 5。低于 5 是 1。低于 1 是 7。低于 7 是 0,完成该循环。使用 -> 记下下一个索引,在这种情况下有 3 个周期:

{0->2->5->1->7->0} {3->6->3} {4->4}

就地重新排序的作用是撤消 a[] 和 pa[] 的循环。它找到 pa[0] 处的第一个循环 (i != pa[i]-a)。查看 a[],你有 ta=a[0], a[0]=a[2], a[2] = a[5], a[5]=a[1], a[1]=a[7],此时 i == 0 == pa[7]-a,循环的最后一部分,它设置 a[7] = ta。PA[] 以相同的方式更新。下一个循环是 ta=a[3], a[3]=a[6], a[6] = ta。最后一个循环,4->4指向自身,因此被跳过(i == pa[i]-a)。其时间复杂度为 O(n)。

有一个关于排列和循环符号的YouTube视频(在这种情况下,它将是(0,2,5,1,7)(3,6)((4)被忽略,因为它已经到位)。您可以在网上搜索其他文章的"排列周期"。

https://www.youtube.com/watch?v=MpKG6FmcIHk

这可能会解决问题。将Row定义为指向行首的指针,如下所示:

struct Row {
double* start;
static int columns;
Row(const Row& row) = default;
// Overload operator= to copy your data.
Row& operator=(const Row& rhs) {
memcpy(start, rhs.start, columns*sizeof(double));
}
Row operator<(const Row& rhs) const {
// your comparison function
}
};

像这样使用:

double* data = ...;
size_t n_rows = ...;
size_t n_cols = COLUMNS;
Row::columns = n_cols;
std::vector<Row> rows(n_rows);
for(int i=0;i<n_rows;++i) {
rows[i].start = data + i*n_cols;
}
std::sort(rows.begin(), rows.end());

您需要创建一个std::vector<Row>。希望您没有那么多行,所以这是一个性能问题。