找到一种有效的方法,在 2 个巨大的缓冲区上执行 MAX,每字节字节
Find an efficient way to perform a MAX, byte per byte, on 2 huge buffers
我需要非常快速地比较9数百万个字节,以保持每个字节的最大值。这是我的工作:
int bufSize = 9000000;
byte_t *buf = /* ... */;
byte_t *maxBuf = /* ... */;
for (int i = 0; i < bufSize; ++i) {
if (buf[i] > maxBuf[i]) {
maxBuf[i] = buf[i];
}
}
它可以工作,但我需要将处理时间缩短 3。
特别是,有没有办法使用 CPU 的 64 位?
你知道 numpy 数组是否有帮助吗?
编辑:处理器是四核ARM Cortex-A57,操作系统是Tegra的Linux。对不起,我早该写的。
指出显而易见的一点。您的代码正在有选择地修改 maxBuf 中的数据,这会导致矢量化器失败。只需将代码更改为使用 std::max 代替....
for (int i = 0; i < bufSize; ++i) {
maxBuf[i] = std::max(maxBuf[i], buf[i]);
}
。代码现在将矢量化。
证明:https://godbolt.org/z/rviiKF
内部循环已经展开,现在使用 AVX2:
.LBB0_12: # =>This Inner Loop Header: Depth=1
vmovdqu ymm0, ymmword ptr [rsi + rax]
vmovdqu ymm1, ymmword ptr [rsi + rax + 32]
vmovdqu ymm2, ymmword ptr [rsi + rax + 64]
vmovdqu ymm3, ymmword ptr [rsi + rax + 96]
vpmaxub ymm0, ymm0, ymmword ptr [rdi + rax]
vpmaxub ymm1, ymm1, ymmword ptr [rdi + rax + 32]
vmovdqu ymmword ptr [rsi + rax], ymm0
vmovdqu ymmword ptr [rsi + rax + 32], ymm1
vpmaxub ymm0, ymm2, ymmword ptr [rdi + rax + 64]
vpmaxub ymm1, ymm3, ymmword ptr [rdi + rax + 96]
vmovdqu ymmword ptr [rsi + rax + 64], ymm0
vmovdqu ymmword ptr [rsi + rax + 96], ymm1
vmovdqu ymm0, ymmword ptr [rsi + rax + 128]
vmovdqu ymm1, ymmword ptr [rsi + rax + 160]
vpmaxub ymm0, ymm0, ymmword ptr [rdi + rax + 128]
vpmaxub ymm1, ymm1, ymmword ptr [rdi + rax + 160]
vmovdqu ymmword ptr [rsi + rax + 128], ymm0
vmovdqu ymmword ptr [rsi + rax + 160], ymm1
vmovdqu ymm0, ymmword ptr [rsi + rax + 192]
vmovdqu ymm1, ymmword ptr [rsi + rax + 224]
vpmaxub ymm0, ymm0, ymmword ptr [rdi + rax + 192]
vpmaxub ymm1, ymm1, ymmword ptr [rdi + rax + 224]
vmovdqu ymmword ptr [rsi + rax + 192], ymm0
vmovdqu ymmword ptr [rsi + rax + 224], ymm1
add rax, 256
add rdx, 4
jne .LBB0_12
如果您拥有支持 AVX2 的 CPU 并使用英特尔 SIMD 内部函数一次处理 32 字节(英特尔固有指南 - max(,则可以获得高效的解决方案(在我的系统 [英特尔 i5-8250U] ~45ms 与 ~1ms
(因为 9000000 可以被 32 整除,所以你甚至不需要额外的循环来完成。
// #include <immintrin.h>, also for g++ add `-mavx2`-flag
int bufSize = 9000000;
byte *buf = static_cast<byte *>(_mm_malloc(sizeof(*buf) * bufSize, 32));
byte *maxBuf = static_cast<byte *>(_mm_malloc(sizeof(*maxBuf) * bufSize, 32));
for (int i = 0; i < bufSize; ++i)
{
buf[i] = (byte) rand();
maxBuf[i] = (byte) rand();
}
for (int i = 0; i < bufSize; i += 32)
{
__m256i *buf_simd = (__m256i *) &buf[i];
__m256i *maxBuf_simd = (__m256i *) &maxBuf[i];
*maxBuf_simd = _mm256_max_epu8(*maxBuf_simd, *buf_simd);
}
_mm_free(buf);
_mm_free(maxBuf);
因为我没有你的数据,所以我用随机数据创建了两个数组。在这里,它们与 32 字节对齐非常重要。
之后,在 for 循环的每次迭代中,我将 32Byte 加载到向量寄存器中并执行_mm256_max_epu8
,基本上将 256 位划分为 32 字节"数据包"(所谓的打包向量(并选择每个字节的最大值(更详细的解释可以在上面的链接中找到(。
如果只有一个支持 SSE2 的 CPU,则可以将_mm_max_epu8
与 128 位矢量一起使用。
就您拥有的东西而言,没有更快的方法可以做到这一点。使用 python 的 numpy 实际上只会改进 python 给你类似 C 的行为。
我认为你最好的选择是使用OpenMP。这是一个关于它的简单教程。由于每次迭代都是相互独立的,我认为您的代码应如下所示:
#pragma omp parallel for
for (int i = 0; i < bufSize; ++i) {
#pragma omp simd
if (buf[i] > maxBuf[i]) {
maxBuf[i] = buf[i];
}
}
然后你使用 -fopenmp 进行编译。我不确定#pragma omp simd
线是否会有很大帮助。
您还可以添加编译器优化。这是一个列表。另请参阅手册页。不过,这些并不总是提高速度,这取决于很多因素。只需尝试一下,它就可以严重优化您的代码。
例如,我有一个算法需要几个小时。在进行编译器优化和 OpenMP 后,我能够将其缩短到大约 30 秒。但是编程的这个领域可能会变得非常困难,并且需要考虑很多因素。
多亏了@Frederik我们才找到了如何在ARM上使用NEON执行这些操作。
这是代码:
#include <arm_neon.h>
int bufSize = 9000000;
byte_t *buf = static_cast<byte_t *>(aligned_alloc(8, bufSize));
byte_t *maxBuf = static_cast<byte_t *>(aligned_alloc(8, bufSize));
// Optimized MAX using NEON, it works on packets of 8 bytes.
byte_t *maxPtr = maxBuf;
const byte_t *newPtr = buf;
int iterCount = bufSize / 8;
for (int i = 0; i < iterCount; ++i) {
// load 8 bytes
uint8x8_t v1 = vld1_u8(maxPtr);
uint8x8_t v2 = vld1_u8(newPtr);
// max on 8 bytes
uint8x8_t result1 = vmax_u8(v1, v2);
// store the result
vst1_u8(maxPtr, result1);
// move 8 bytes
maxPtr += 8;
newPtr += 8;
}
// Less optimized MAX for the remaining bytes (if 'bufSize' is not a multiple of 8).
for (int i = iterCount * 8; i < bufSize; ++i) {
maxBuf[i] = std::max(maxBuf[i], buf[i]);
}
free(buf);
free(maxBuf);
我们的编译器选项:-O3 -ffast-math -march=armv8-a+simd
.
处理时间现在落后于 6 毫秒。按照@robthebloke的建议,初始if
为 17 毫秒,std::max
为 12 毫秒。非常感谢,伙计们!
一些文档:
vld1_u8
:http://infocenter.arm.com/help/index.jsp?topic=/com.arm.doc.dui0472j/chr1360928371756.htmlvmax_u8
:http://infocenter.arm.com/help/index.jsp?topic=/com.arm.doc.dui0472k/chr1360928366062.htmlvst1_u8
:http://infocenter.arm.com/help/index.jsp?topic=/com.arm.doc.dui0472k/chr1359125040827.html- 另请参阅:使用 NEON 减去两个图像
- struct.error:解压缩 C++ 结构时,解包需要 288 字节的缓冲区
- 找到一种有效的方法,在 2 个巨大的缓冲区上执行 MAX,每字节字节
- 从原始字节解码协议缓冲区(以 C++为单位)
- 如果我向一个12字节的缓冲区写入的字节数少于12,会发生什么情况
- 将constexpr字节数组与缓冲区的一部分(指向数据的指针)进行比较
- 我该如何循环遍历我的数组(缓冲区——包含一个文本文件),并将其打印成30字节的块
- Fread C++ in C#.我知道我需要字节缓冲区,但并不完全在那里
- 如何通过1024字节缓冲区和强力冲洗在插座连接上发送6个字符的C弦
- 将缓冲区与 N 字节边界对齐,而不是 2N 字节边界?
- std::字符串与字节缓冲区(C++的差异)
- 删除字节数组 : Qt 缓冲区之间的空字符 (\x00)
- 将数组与传入的字节*缓冲区进行比较的最快方法
- 在内存(C 和/或 C++)中创建和管理字节缓冲区,该缓冲区可以根据需要自动调整大小
- 高效生成字节缓冲区,而不会破坏严格的混叠
- 如何从java传递给jni的字节缓冲区中进行写和读
- 解析来自固定大小字节缓冲区的连续块的protobuf消息序列
- 从字节缓冲区强制转换结构
- 以十六进制表示法(0xABCDEF)将字节缓冲区的字节输出到控制台输出流
- JNI-向C++发送一个大字节[]缓冲区
- 带有附加字节缓冲区的Allocate_shared