在指向 2D 数据的指针上C++"std::sort"而不复制
C++ `std::sort` over pointer to 2D data without copying
我在 C 风格的 2D 数据数组中有大量数据(GiB 到 TiB)。它不是数组的数组,而是指向解释为 2D 数据的指针。它非常大,所以我不想将其复制到std::vector
s 或类似内容。我无法控制数据源,它来自外部库。
我需要根据列中的数据std::sort
数据的行(不完全是lex排序,但类似的概念)。
我已经弄清楚了如何使用编译时已知数量的列来做到这一点。例如:
#define COLUMNS 4
struct Row {
double values[COLUMNS];
};
double* data = ...;
size_t n_rows = ...;
size_t n_cols = COLUMNS;
std::sort((Row*)data, ((Row*)data)+n_rows, comp);
我知道我可以为COLUMNS
模板结构而不是使用宏,而不是使用comp
可以定义比在Row
结构中operator<
,但这不会改变列数的编译时性质*。
我能想到的唯一解决方案是使用知道每行步幅的自定义随机访问迭代器。但是在我制作自己的迭代器之前(这对我来说总是有点令人生畏),我想确保没有其他方法。
*做出这些设计选择是因为我实际上是在 Cython 而不是 C++ 中编写的,但这无关紧要,我不知道如何在没有自定义迭代器的情况下使用 C++ 做到这一点。我愿意用C++编写解决方案,但更喜欢可以编写的选项 Cython(我可以转换)。
示例代码,显示以下 O(n) 时间内的重新排序。您需要更改pa[i]-a,它将指针转换为索引以处理 a[] 的实际结构。
#include <algorithm>
#include <iostream>
bool compare(const double *p0, const double *p1)
{
return *p0 < *p1;
}
int main()
{
double a[8] = {8.0,6.0,1.0,7.0,5.0,3.0,4.0,2.0};
double *pa[8];
size_t i, j, k;
double ta;
// create array of pointers to a[]
for(i = 0; i < sizeof(a)/sizeof(a[0]); i++)
pa[i] = &a[i];
// sort array of pointers to a[]
std::sort(pa, pa+sizeof(a)/sizeof(a[0]), compare);
// reorder a[] and pa[] according to pa[] in O(n) time
for(i = 0; i < sizeof(a)/sizeof(a[0]); i++){
if(i != pa[i]-a){
ta = a[i];
k = i;
while(i != (j = pa[k]-a)){
a[k] = a[j];
pa[k] = &a[k];
k = j;
}
a[k] = ta;
pa[k] = &a[k];
}
}
for(i = 0; i < sizeof(a)/sizeof(a[0]); i++)
std::cout << a[i] << ' ';
std::cout << std::endl;
return 0;
}
就地重新排序的工作原理是撤消根据 a[] 排序的 pa[] 中的"循环"。对于此示例代码,索引列表 0 到 7 后跟 i = 0 到 7 的 pa[i]-a 列表,结果为:
0 1 2 3 4 5 6 7 (i)
2 7 5 6 4 1 3 0 (pa[i] - a)
这显示了 pa[] 中根据 a[] 排序的"循环"。从 (i) 行中的 0 开始,它下面的索引是 2。查看 i 行中的 2,它下面的数字是 5。低于 5 是 1。低于 1 是 7。低于 7 是 0,完成该循环。使用 -> 记下下一个索引,在这种情况下有 3 个周期:
{0->2->5->1->7->0} {3->6->3} {4->4}
就地重新排序的作用是撤消 a[] 和 pa[] 的循环。它找到 pa[0] 处的第一个循环 (i != pa[i]-a)。查看 a[],你有 ta=a[0], a[0]=a[2], a[2] = a[5], a[5]=a[1], a[1]=a[7],此时 i == 0 == pa[7]-a,循环的最后一部分,它设置 a[7] = ta。PA[] 以相同的方式更新。下一个循环是 ta=a[3], a[3]=a[6], a[6] = ta。最后一个循环,4->4指向自身,因此被跳过(i == pa[i]-a)。其时间复杂度为 O(n)。
有一个关于排列和循环符号的YouTube视频(在这种情况下,它将是(0,2,5,1,7)(3,6)((4)被忽略,因为它已经到位)。您可以在网上搜索其他文章的"排列周期"。
https://www.youtube.com/watch?v=MpKG6FmcIHk
这可能会解决问题。将Row
定义为指向行首的指针,如下所示:
struct Row {
double* start;
static int columns;
Row(const Row& row) = default;
// Overload operator= to copy your data.
Row& operator=(const Row& rhs) {
memcpy(start, rhs.start, columns*sizeof(double));
}
Row operator<(const Row& rhs) const {
// your comparison function
}
};
像这样使用:
double* data = ...;
size_t n_rows = ...;
size_t n_cols = COLUMNS;
Row::columns = n_cols;
std::vector<Row> rows(n_rows);
for(int i=0;i<n_rows;++i) {
rows[i].start = data + i*n_cols;
}
std::sort(rows.begin(), rows.end());
您需要创建一个std::vector<Row>
。希望您没有那么多行,所以这是一个性能问题。
- 使用std::multimap迭代器创建std::list
- C++中std::resize(n)和std::shrink_to_fit之间的区别
- 来自 std::list 的迭代器 .end() 按预期返回"0xcdcdcdcdcdcdcdcd"但 .begin()
- C++17复制构造函数,在std::unordereded_map上进行深度复制
- 如何导出包含具有"std::unique_ptr"值的"std::map"属性的
- 从持续时间构造std::chrono::system_clock::time_point
- std::具有相同基类的类的变体
- std::向量与传递值的动态数组
- 使用std::vector的OpenCL矩阵乘法
- std::map<struct,struct>::find 找不到匹配项,但是如果我循环通过 begin() 到 end(),我在那里看到匹配项
- std::condition_variable::wait()如何评估给定的谓词
- 如何获取std::result_of函数的返回类型
- std::原子加载和存储都需要吗
- 将对象移动到std::shared_ptr
- POCO::PostgreSQL:如何将std::vector支持添加到`Binder::bind`
- 使用一个考虑到std::map中键值的滚动或换行的键
- 如何从 std::atomic 中提取指针 T<T>?
- 为什么 std::unique 不调用 std::sort?
- 使用std::函数映射对象方法
- 如何检测我何时向可变参数函数传递"std::string"而不是"c_str()"