omp并行用于二次筛的无优化

omp parallel for no optimization achieved for quadratic sieve

本文关键字:二次 优化 并行 用于 omp      更新时间:2023-10-16

我正在尝试使用开放mp实现并行二次筛选。在筛选阶段,我使用对数近似来检查可分割性。这是我的密码。

    #pragma omp parallel for schedule (dynamic) num_threads(4)
    for (int i = 0; i < factorBase.size(); ++i) {
        const uint32_t p = factorBase[i];
        const float logp = std::log(factorBase[i]) / std::log(2);
        // Sieve first sequence.
        while (startIndex.first[i] < intervalEnd) {
            logApprox[startIndex.first[i] - intervalStart] -= logp;
            startIndex.first[i] += p;
        }
        if (p == 2)
            continue; // a^2 = N (mod 2) only has one root.
        // Sieve second sequence.
        while (startIndex.second[i] < intervalEnd) {
            logApprox[startIndex.second[i] - intervalStart] -= logp;
            startIndex.second[i] += p;
        }
    }

这里factorbaselogApprox是如下初始化的std::vectors

std::vector<float> logApprox(INTERVAL_LENGTH, 0);
std::vector<uint32_t> factorBase;

每当我运行这段代码并比较运行时间时,顺序运行和并行运行之间没有太大区别。可以做哪些优化?我是openmp的初学者,任何帮助都将不胜感激。感谢

您的任务非常有趣!谢谢

决定用很多优化来实现我自己的实现。

与您的原始代码相比,我实现了20.4x倍的提升(您的代码为17.86秒,我的代码为0.87秒)。此外,我在实现相同目标的同时,使用了比您的算法少2x倍的筛选内存。

为了进行比较,我简化了您的代码,使其仍然可以做几乎相同的事情,运行完全相同的时间,但看起来要简单得多:

#pragma omp parallel for
for (size_t i = 0; i < factorBase.size(); ++i) {
    auto const p = factorBase[i];
    float const logp = std::log(p) / std::log(2);
    
    while (startIndex[i] < logApprox.size()) {
        logApprox[startIndex[i]] += logp;
        startIndex[i] += p;
    }
}

你们可以看到,我只留下了一个筛子循环,第二个做同样的事情,不需要演示,所以我删除了它。我还删除了startInterval,因为它和速度演示无关。为了简单起见,我做了对数的+=,而不是你的-=

关于您的算法,一个重要的注意事项是它不进行任何同步,这意味着CPU的不同内核可能会写入logApprox数组的同一条目,从而给出错误的结果。

正如我所测量的,这个错误的结果在logApprovx数组的一亿个条目中发生一两次。我的优化代码克服了这个限制,除了进行所有速度优化外,还进行了正确的同步。

我做了以下改进以获得20x倍的加速:

  1. 我把整个数组分成块,大小大约是2^13个元素。每组块由单独的线程/CPU核心处理,因此不需要同步线程。除了避免同步之外,非常重要的是2^13块完全适合CPU的L1或L2缓存,因此大大加快了速度。

  2. 每个2^13的块被处理为所有可能的素数。为了跟踪需要哪些素数的偏移量,我创建了一个2^7大小的特殊环形缓冲区,这个环形缓冲区用块号模2^7索引,并跟踪每个块需要哪些素数和哪些偏移量(模2^2)。

  3. 我有和CPU核心一样多的线程。对于每个线程,我预计算了该线程所有素数的起始偏移量,这些起始偏移量是通过基于您在原始代码中提供的startIndex数组的模运算来计算的。

  4. 为了提高速度,我使用了基于uint16_t的整数对数,而不是float对数。该整数对数计算为uint16_t integer_log = uint16_t(std::log2(p) * (1 << 8) + 0.5);。除了提高整数对数的+=的计算速度外,它们还减少了占用内存2x的次数。如果出于某种原因,uint16_t对数对您来说不够,那么请在我的代码中用using ILog2T = u32;替换using ILog2T = u16;,但这将使使用的内存量翻倍。

我的代码输出如下到控制台:

time_simple 17.859 sec, time_optimized 0.874 sec, boost 20.434, correct_ratio 0.999999993

时间简单是筛选大小为2^28的数组的原始代码的时间,时间优化是我对相同数组的代码,提升是我的代码快多少(你可以看到它快20倍)。"正确比率"表示,由于缺乏多核同步,代码中是否存在任何错误(正如您所看到的,有时它小于1.0,因此存在一些错误)。

完整的优化代码如下:

在线试用!

#include <cstdint>
#include <random>
#include <iostream>
#include <iomanip>
#include <chrono>
#include <thread>
#include <type_traits>
#include <vector>
#include <stdexcept>
#include <sstream>
#include <mutex>
#include <omp.h>
#define ASSERT_MSG(cond, msg) { if (!(cond)) throw std::runtime_error("Assertion (" #cond ") failed at line " + std::to_string(__LINE__) + "! Msg: '" + std::string(msg) + "'."); }
#define ASSERT(cond) ASSERT_MSG(cond, "")
#define OSTR(code) ([&]{ std::ostringstream ss; ss code; return ss.str(); }())
#define COUT(code) { std::unique_lock<std::mutex> lock(cout_mux); std::cout code; std::cout << std::flush; }
#define LN { COUT(<< "LN " << __LINE__ << std::endl); }
#define DUMP(var) { COUT(<< #var << " = (" << (var) << ")" << std::endl); }
using u16 = uint16_t;
using u32 = uint32_t;
using u64 = uint64_t;
using ILog2T = u16;
using PrimeT = u32;
std::mutex cout_mux;
template <typename T>
std::vector<T> GenPrimes(size_t end) {
    thread_local std::vector<T> primes = {2, 3};
    while (primes.back() < end) {
        for (T p = primes.back() + 2;; p += 2) {
            bool is_prime = true;
            for (auto d: primes) {
                if (u64(d) * d > p)
                    break;
                if (p % d == 0) {
                    is_prime = false;
                    break;
                }
            }
            if (is_prime) {
                primes.push_back(p);
                break;
            }
        }
    }
    primes.pop_back();
    return primes;
}
void SieveA(std::vector<float> & logApprox, std::vector<PrimeT> const & factorBase, std::vector<PrimeT> startIndex) {
    #pragma omp parallel for
    for (size_t i = 0; i < factorBase.size(); ++i) {
        auto const p = factorBase[i];
        float const logp = std::log(p) / std::log(2);
        
        while (startIndex[i] < logApprox.size()) {
            logApprox[startIndex[i]] += logp;
            startIndex[i] += p;
        }
    }
}
size_t NThreads() {
    //return 1;
    return std::thread::hardware_concurrency();
}
ILog2T LogToI(double x) { return ILog2T(x * (1ULL << (sizeof(ILog2T) * 8 - 8)) + 0.5); }
double IToLog(ILog2T x) { return x / double(1ULL << (sizeof(ILog2T) * 8 - 8)); }
double Time() {
    static auto const gtb = std::chrono::high_resolution_clock::now();
    return std::chrono::duration_cast<std::chrono::duration<double>>(
        std::chrono::high_resolution_clock::now() - gtb).count();
}
std::string FloatToStr(double x, size_t round = 6) {
    return OSTR(<< std::fixed << std::setprecision(round) << x);
}
double SieveB(std::vector<ILog2T> & logs, std::vector<PrimeT> const & primes, std::vector<PrimeT> const & starts0) {
    auto const nthr = NThreads();
    std::vector<std::vector<PrimeT>> starts(nthr, std::vector<PrimeT>(primes.size()));
    std::vector<std::vector<ILog2T>> plogs(nthr, std::vector<ILog2T>(primes.size()));
    std::vector<std::pair<u64, u64>> ranges(nthr);
    size_t constexpr block_log2 = 13, block = 1 << block_log2, ring_log2 = 6, ring_size = 1ULL << ring_log2, ring_mask = ring_size - 1;
    std::vector<std::vector<std::vector<std::pair<u32, u32>>>> ring(nthr, std::vector<std::vector<std::pair<u32, u32>>>(ring_size));
    #pragma omp parallel for
    for (size_t ithr = 0; ithr < nthr; ++ithr) {
        size_t const nblock = ((logs.size() + nthr - 1) / nthr + block - 1) / block * block,
            begin = ithr * nblock, end = std::min<size_t>(logs.size(), (ithr + 1) * nblock);
        ranges[ithr] = {begin, end};
        for (size_t i = 0; i < primes.size(); ++i) {
            PrimeT const p = primes[i];
            size_t const mod0 = begin % p, mod = starts0[i] < mod0 ? p + starts0[i] - mod0 : starts0[i] - mod0;
            starts[ithr][i] = mod;
            plogs[ithr][i] = LogToI(std::log2(p));
            ring[ithr][((begin + starts[ithr][i]) >> block_log2) & ring_mask].push_back({i, begin + starts[ithr][i]});
        }
    }
    auto tim = Time();
    #pragma omp parallel for
    for (size_t ithr = 0; ithr < nthr; ++ithr) {
        auto const [begin, end] = ranges[ithr];
        auto const [bbegin, bend] = std::make_tuple(begin / block, (end - 1) / block + 1);
        auto const & cstarts = starts.at(ithr);
        auto const & cplogs = plogs.at(ithr);
        auto & cring = ring[ithr];
        std::decay_t<decltype(cring[0])> tmp;
        size_t hit_cnt = 0, miss_cnt = 0;
        for (size_t iblock = bbegin; iblock < bend; ++iblock) {
            size_t const cbegin = iblock << block_log2, cend = std::min<size_t>(end, (iblock + 1) << block_log2);
            auto & ring_cur = cring[iblock & ring_mask];
            tmp = ring_cur;
            ring_cur.clear();
            for (auto [ip, off]: tmp)
                if (off >= cend) {
                    //++miss_cnt;
                    ring_cur.push_back({ip, off});
                } else {
                    //++hit_cnt;
                    auto const p = primes[ip];
                    auto const plog = cplogs[ip];
                    for (; off < cend; off += p) {
                        //if (8192 - 10 <= off && off <= 8192 + 10) COUT(<< "logs.size() " << logs.size() << " begin " << begin << " end " << end << " bbegin " << bbegin << " bend " << bend << " cbegin " << cbegin << " cend " << cend << " iblock " << iblock << " off " << off << " p " << p << " plog " << plog << std::endl);
                        logs[off] += plog;
                    }
                    if (off < end)
                        cring[(off >> block_log2) & ring_mask].push_back({ip, off});
                }
        }
        //COUT(<< "hit_ratio " << std::fixed << std::setprecision(6) << double(hit_cnt) / (hit_cnt + miss_cnt) << std::endl);
    }
    return Time() - tim;
}
void Test() {
    size_t constexpr len = 1ULL << 28;
    std::mt19937_64 rng{123};
    auto const primes = GenPrimes<PrimeT>(1 << 12);
    std::vector<PrimeT> starts;
    for (auto p: primes)
        starts.push_back(rng() % p);
    ASSERT(primes.size() == starts.size());
    double tA = 0, tB = 0;
    std::vector<float> logsA(len);
    std::vector<ILog2T> logsB(len);
    {
        tA = Time();
        SieveA(logsA, primes, starts);
        tA = Time() - tA;
    }
    {
        tB = SieveB(logsB, primes, starts);
    }
    size_t correct = 0;
    for (size_t i = 0; i < len; ++i) {
        //ASSERT_MSG(std::abs(logsA[i] - IToLog(logsB[i])) < 0.1, "i " + std::to_string(i) + " logA " + FloatToStr(logsA[i], 3) + " logB " + FloatToStr(IToLog(logsB[i]), 3));
        if (std::abs(logsA[i] - IToLog(logsB[i])) < 0.1)
            ++correct;
    }
    std::cout << std::fixed << std::setprecision(3) << "time_simple " << tA << " sec, time_optimized " << tB << " sec, boost " << (tA / tB) << ", correct_ratio " << std::setprecision(9) << double(correct) / len << std::endl;
}
int main() {
    try {
        omp_set_num_threads(NThreads());
        Test();
        return 0;
    } catch (std::exception const & ex) {
        std::cout << "Exception: " << ex.what() << std::endl;
        return -1;
    }
}

输出:

time_simple 17.859 sec, time_optimized 0.874 sec, boost 20.434, correct_ratio 0.999999993

在我看来,你应该把时间表变成静态的,并给它块大小(https://software.intel.com/en-us/articles/openmp-loop-scheduling)。

一个小的优化应该是:

在大的FOR循环之外,声明一个常量并将其初始化为1/std::log(2),然后在FOR循环内部,不是用std::log(2)除法,而是用前一个常量相乘,除法在CPU周期中非常昂贵。