omp并行用于二次筛的无优化
omp parallel for no optimization achieved for quadratic sieve
我正在尝试使用开放mp实现并行二次筛选。在筛选阶段,我使用对数近似来检查可分割性。这是我的密码。
#pragma omp parallel for schedule (dynamic) num_threads(4)
for (int i = 0; i < factorBase.size(); ++i) {
const uint32_t p = factorBase[i];
const float logp = std::log(factorBase[i]) / std::log(2);
// Sieve first sequence.
while (startIndex.first[i] < intervalEnd) {
logApprox[startIndex.first[i] - intervalStart] -= logp;
startIndex.first[i] += p;
}
if (p == 2)
continue; // a^2 = N (mod 2) only has one root.
// Sieve second sequence.
while (startIndex.second[i] < intervalEnd) {
logApprox[startIndex.second[i] - intervalStart] -= logp;
startIndex.second[i] += p;
}
}
这里factorbase
和logApprox
是如下初始化的std::vectors
std::vector<float> logApprox(INTERVAL_LENGTH, 0);
std::vector<uint32_t> factorBase;
每当我运行这段代码并比较运行时间时,顺序运行和并行运行之间没有太大区别。可以做哪些优化?我是openmp的初学者,任何帮助都将不胜感激。感谢
您的任务非常有趣!谢谢
决定用很多优化来实现我自己的实现。
与您的原始代码相比,我实现了20.4x倍的提升(您的代码为17.86秒,我的代码为0.87秒)。此外,我在实现相同目标的同时,使用了比您的算法少2x倍的筛选内存。
为了进行比较,我简化了您的代码,使其仍然可以做几乎相同的事情,运行完全相同的时间,但看起来要简单得多:
#pragma omp parallel for
for (size_t i = 0; i < factorBase.size(); ++i) {
auto const p = factorBase[i];
float const logp = std::log(p) / std::log(2);
while (startIndex[i] < logApprox.size()) {
logApprox[startIndex[i]] += logp;
startIndex[i] += p;
}
}
你们可以看到,我只留下了一个筛子循环,第二个做同样的事情,不需要演示,所以我删除了它。我还删除了startInterval
,因为它和速度演示无关。为了简单起见,我做了对数的+=
,而不是你的-=
。
关于您的算法,一个重要的注意事项是它不进行任何同步,这意味着CPU的不同内核可能会写入logApprox
数组的同一条目,从而给出错误的结果。
正如我所测量的,这个错误的结果在logApprovx数组的一亿个条目中发生一两次。我的优化代码克服了这个限制,除了进行所有速度优化外,还进行了正确的同步。
我做了以下改进以获得20x倍的加速:
我把整个数组分成块,大小大约是2^13个元素。每组块由单独的线程/CPU核心处理,因此不需要同步线程。除了避免同步之外,非常重要的是2^13块完全适合CPU的L1或L2缓存,因此大大加快了速度。
每个2^13的块被处理为所有可能的素数。为了跟踪需要哪些素数的偏移量,我创建了一个2^7大小的特殊环形缓冲区,这个环形缓冲区用块号模2^7索引,并跟踪每个块需要哪些素数和哪些偏移量(模2^2)。
我有和CPU核心一样多的线程。对于每个线程,我预计算了该线程所有素数的起始偏移量,这些起始偏移量是通过基于您在原始代码中提供的
startIndex
数组的模运算来计算的。为了提高速度,我使用了基于
uint16_t
的整数对数,而不是float
对数。该整数对数计算为uint16_t integer_log = uint16_t(std::log2(p) * (1 << 8) + 0.5);
。除了提高整数对数的+=
的计算速度外,它们还减少了占用内存2x的次数。如果出于某种原因,uint16_t对数对您来说不够,那么请在我的代码中用using ILog2T = u32;
替换using ILog2T = u16;
,但这将使使用的内存量翻倍。
我的代码输出如下到控制台:
time_simple 17.859 sec, time_optimized 0.874 sec, boost 20.434, correct_ratio 0.999999993
时间简单是筛选大小为2^28的数组的原始代码的时间,时间优化是我对相同数组的代码,提升是我的代码快多少(你可以看到它快20倍)。"正确比率"表示,由于缺乏多核同步,代码中是否存在任何错误(正如您所看到的,有时它小于1.0,因此存在一些错误)。
完整的优化代码如下:
在线试用!
#include <cstdint>
#include <random>
#include <iostream>
#include <iomanip>
#include <chrono>
#include <thread>
#include <type_traits>
#include <vector>
#include <stdexcept>
#include <sstream>
#include <mutex>
#include <omp.h>
#define ASSERT_MSG(cond, msg) { if (!(cond)) throw std::runtime_error("Assertion (" #cond ") failed at line " + std::to_string(__LINE__) + "! Msg: '" + std::string(msg) + "'."); }
#define ASSERT(cond) ASSERT_MSG(cond, "")
#define OSTR(code) ([&]{ std::ostringstream ss; ss code; return ss.str(); }())
#define COUT(code) { std::unique_lock<std::mutex> lock(cout_mux); std::cout code; std::cout << std::flush; }
#define LN { COUT(<< "LN " << __LINE__ << std::endl); }
#define DUMP(var) { COUT(<< #var << " = (" << (var) << ")" << std::endl); }
using u16 = uint16_t;
using u32 = uint32_t;
using u64 = uint64_t;
using ILog2T = u16;
using PrimeT = u32;
std::mutex cout_mux;
template <typename T>
std::vector<T> GenPrimes(size_t end) {
thread_local std::vector<T> primes = {2, 3};
while (primes.back() < end) {
for (T p = primes.back() + 2;; p += 2) {
bool is_prime = true;
for (auto d: primes) {
if (u64(d) * d > p)
break;
if (p % d == 0) {
is_prime = false;
break;
}
}
if (is_prime) {
primes.push_back(p);
break;
}
}
}
primes.pop_back();
return primes;
}
void SieveA(std::vector<float> & logApprox, std::vector<PrimeT> const & factorBase, std::vector<PrimeT> startIndex) {
#pragma omp parallel for
for (size_t i = 0; i < factorBase.size(); ++i) {
auto const p = factorBase[i];
float const logp = std::log(p) / std::log(2);
while (startIndex[i] < logApprox.size()) {
logApprox[startIndex[i]] += logp;
startIndex[i] += p;
}
}
}
size_t NThreads() {
//return 1;
return std::thread::hardware_concurrency();
}
ILog2T LogToI(double x) { return ILog2T(x * (1ULL << (sizeof(ILog2T) * 8 - 8)) + 0.5); }
double IToLog(ILog2T x) { return x / double(1ULL << (sizeof(ILog2T) * 8 - 8)); }
double Time() {
static auto const gtb = std::chrono::high_resolution_clock::now();
return std::chrono::duration_cast<std::chrono::duration<double>>(
std::chrono::high_resolution_clock::now() - gtb).count();
}
std::string FloatToStr(double x, size_t round = 6) {
return OSTR(<< std::fixed << std::setprecision(round) << x);
}
double SieveB(std::vector<ILog2T> & logs, std::vector<PrimeT> const & primes, std::vector<PrimeT> const & starts0) {
auto const nthr = NThreads();
std::vector<std::vector<PrimeT>> starts(nthr, std::vector<PrimeT>(primes.size()));
std::vector<std::vector<ILog2T>> plogs(nthr, std::vector<ILog2T>(primes.size()));
std::vector<std::pair<u64, u64>> ranges(nthr);
size_t constexpr block_log2 = 13, block = 1 << block_log2, ring_log2 = 6, ring_size = 1ULL << ring_log2, ring_mask = ring_size - 1;
std::vector<std::vector<std::vector<std::pair<u32, u32>>>> ring(nthr, std::vector<std::vector<std::pair<u32, u32>>>(ring_size));
#pragma omp parallel for
for (size_t ithr = 0; ithr < nthr; ++ithr) {
size_t const nblock = ((logs.size() + nthr - 1) / nthr + block - 1) / block * block,
begin = ithr * nblock, end = std::min<size_t>(logs.size(), (ithr + 1) * nblock);
ranges[ithr] = {begin, end};
for (size_t i = 0; i < primes.size(); ++i) {
PrimeT const p = primes[i];
size_t const mod0 = begin % p, mod = starts0[i] < mod0 ? p + starts0[i] - mod0 : starts0[i] - mod0;
starts[ithr][i] = mod;
plogs[ithr][i] = LogToI(std::log2(p));
ring[ithr][((begin + starts[ithr][i]) >> block_log2) & ring_mask].push_back({i, begin + starts[ithr][i]});
}
}
auto tim = Time();
#pragma omp parallel for
for (size_t ithr = 0; ithr < nthr; ++ithr) {
auto const [begin, end] = ranges[ithr];
auto const [bbegin, bend] = std::make_tuple(begin / block, (end - 1) / block + 1);
auto const & cstarts = starts.at(ithr);
auto const & cplogs = plogs.at(ithr);
auto & cring = ring[ithr];
std::decay_t<decltype(cring[0])> tmp;
size_t hit_cnt = 0, miss_cnt = 0;
for (size_t iblock = bbegin; iblock < bend; ++iblock) {
size_t const cbegin = iblock << block_log2, cend = std::min<size_t>(end, (iblock + 1) << block_log2);
auto & ring_cur = cring[iblock & ring_mask];
tmp = ring_cur;
ring_cur.clear();
for (auto [ip, off]: tmp)
if (off >= cend) {
//++miss_cnt;
ring_cur.push_back({ip, off});
} else {
//++hit_cnt;
auto const p = primes[ip];
auto const plog = cplogs[ip];
for (; off < cend; off += p) {
//if (8192 - 10 <= off && off <= 8192 + 10) COUT(<< "logs.size() " << logs.size() << " begin " << begin << " end " << end << " bbegin " << bbegin << " bend " << bend << " cbegin " << cbegin << " cend " << cend << " iblock " << iblock << " off " << off << " p " << p << " plog " << plog << std::endl);
logs[off] += plog;
}
if (off < end)
cring[(off >> block_log2) & ring_mask].push_back({ip, off});
}
}
//COUT(<< "hit_ratio " << std::fixed << std::setprecision(6) << double(hit_cnt) / (hit_cnt + miss_cnt) << std::endl);
}
return Time() - tim;
}
void Test() {
size_t constexpr len = 1ULL << 28;
std::mt19937_64 rng{123};
auto const primes = GenPrimes<PrimeT>(1 << 12);
std::vector<PrimeT> starts;
for (auto p: primes)
starts.push_back(rng() % p);
ASSERT(primes.size() == starts.size());
double tA = 0, tB = 0;
std::vector<float> logsA(len);
std::vector<ILog2T> logsB(len);
{
tA = Time();
SieveA(logsA, primes, starts);
tA = Time() - tA;
}
{
tB = SieveB(logsB, primes, starts);
}
size_t correct = 0;
for (size_t i = 0; i < len; ++i) {
//ASSERT_MSG(std::abs(logsA[i] - IToLog(logsB[i])) < 0.1, "i " + std::to_string(i) + " logA " + FloatToStr(logsA[i], 3) + " logB " + FloatToStr(IToLog(logsB[i]), 3));
if (std::abs(logsA[i] - IToLog(logsB[i])) < 0.1)
++correct;
}
std::cout << std::fixed << std::setprecision(3) << "time_simple " << tA << " sec, time_optimized " << tB << " sec, boost " << (tA / tB) << ", correct_ratio " << std::setprecision(9) << double(correct) / len << std::endl;
}
int main() {
try {
omp_set_num_threads(NThreads());
Test();
return 0;
} catch (std::exception const & ex) {
std::cout << "Exception: " << ex.what() << std::endl;
return -1;
}
}
输出:
time_simple 17.859 sec, time_optimized 0.874 sec, boost 20.434, correct_ratio 0.999999993
在我看来,你应该把时间表变成静态的,并给它块大小(https://software.intel.com/en-us/articles/openmp-loop-scheduling)。
一个小的优化应该是:
在大的FOR循环之外,声明一个常量并将其初始化为1/std::log(2),然后在FOR循环内部,不是用std::log(2)除法,而是用前一个常量相乘,除法在CPU周期中非常昂贵。
- 输出 0 和 -0 的二次公式,与给定的项无关
- C++二和.优化内存使用
- 处理所有二次公式结果
- 两个嵌套循环的运行时间复杂性:二次型还是线性
- Cgal二次规划目标函数
- 二次代数建议数组等返回功能
- 无法从数组二次表达式中检索数据值
- 创建二次公式求解器-范围中未声明的变量
- C++:你如何确定一个解是微不足道的还是不存在的二次函数?
- 如何将二次贝塞尔曲线代码转换为三次贝塞尔曲线
- 为什么使用STL排序()进行二次时间排序
- omp并行用于二次筛的无优化
- 分段故障:SET C++11中的二次排序
- C++识别以像素为单位的二次和三次显示
- 在 Vulkan 中加载非二次幂纹理
- C++二次码错误
- sqrt() 使用 <math.h) 在二次公式中的意外结果>
- 如何顺时针交换二次矩阵的四分之一(从左上角开始)
- 如何在C++中进行二维二次拟合
- 编写二次公式程序时出现编译错误