如何有效地生成Zipf分布数

How to generate Zipf distributed numbers efficiently?

本文关键字:Zipf 分布 有效地      更新时间:2023-10-16

我目前正在对C++中的一些数据结构进行基准测试,我想在处理Zipf分布式数字时对它们进行测试。

我正在使用此网站上提供的生成器:http://www.cse.usf.edu/~christen/tools/toolpage.html

我调整了实现以使用Mersenne Twister生成器。

它工作得很好,但速度真的很慢。在我的情况下,范围可能很大(大约一百万),生成的随机数可能有几百万。

alpha参数不会随时间变化,它是固定的。

我试着预先计算所有的sum_prob。它要快得多,但在大范围内仍然会变慢。

有没有更快的方法生成齐普夫分布数?即使是不那么精确的东西也会受到欢迎。

感谢

仅预计算并没有多大帮助。但很明显,sum_prob是累积的,并且具有升序。因此,如果我们使用二进制搜索来找到zipf_value,我们将把生成zipf分布数的顺序从O(n)降低到O(log(n))。这在效率上有了很大的提高

在这里,只需将genzipf.c中的zipf()函数替换为以下函数:

int zipf(double alpha, int n)
{
  static int first = TRUE;      // Static first time flag
  static double c = 0;          // Normalization constant
  static double *sum_probs;     // Pre-calculated sum of probabilities
  double z;                     // Uniform random number (0 < z < 1)
  int zipf_value;               // Computed exponential value to be returned
  int    i;                     // Loop counter
  int low, high, mid;           // Binary-search bounds
  // Compute normalization constant on first call only
  if (first == TRUE)
  {
    for (i=1; i<=n; i++)
      c = c + (1.0 / pow((double) i, alpha));
    c = 1.0 / c;
    sum_probs = malloc((n+1)*sizeof(*sum_probs));
    sum_probs[0] = 0;
    for (i=1; i<=n; i++) {
      sum_probs[i] = sum_probs[i-1] + c / pow((double) i, alpha);
    }
    first = FALSE;
  }
  // Pull a uniform random number (0 < z < 1)
  do
  {
    z = rand_val(0);
  }
  while ((z == 0) || (z == 1));
  // Map z to the value
  low = 1, high = n, mid;
  do {
    mid = floor((low+high)/2);
    if (sum_probs[mid] >= z && sum_probs[mid-1] < z) {
      zipf_value = mid;
      break;
    } else if (sum_probs[mid] >= z) {
      high = mid-1;
    } else {
      low = mid+1;
    }
  } while (low <= high);
  // Assert that zipf_value is between 1 and N
  assert((zipf_value >=1) && (zipf_value <= n));
  return(zipf_value);
}

我能找到的唯一一个C++11-Zipf随机生成器明确计算了概率,并使用了std::discrete_distribution。这适用于小范围,但如果您需要生成范围非常宽的Zipf值(在我的情况下用于数据库测试),则没有用处,因为这会耗尽内存。因此,我在C++中实现了下面提到的算法。

我还没有严格测试过这段代码,可能会进行一些优化,但它只需要恒定的空间,而且似乎运行良好。

#include <algorithm>
#include <cmath>
#include <random>
/** Zipf-like random distribution.
 *
 * "Rejection-inversion to generate variates from monotone discrete
 * distributions", Wolfgang Hörmann and Gerhard Derflinger
 * ACM TOMACS 6.3 (1996): 169-184
 */
template<class IntType = unsigned long, class RealType = double>
class zipf_distribution
{
public:
    typedef RealType input_type;
    typedef IntType result_type;
    static_assert(std::numeric_limits<IntType>::is_integer, "");
    static_assert(!std::numeric_limits<RealType>::is_integer, "");
    zipf_distribution(const IntType n=std::numeric_limits<IntType>::max(),
                      const RealType q=1.0)
        : n(n)
        , q(q)
        , H_x1(H(1.5) - 1.0)
        , H_n(H(n + 0.5))
        , dist(H_x1, H_n)
    {}
    IntType operator()(std::mt19937& rng)
    {
        while (true) {
            const RealType u = dist(rng);
            const RealType x = H_inv(u);
            const IntType  k = clamp<IntType>(std::round(x), 1, n);
            if (u >= H(k + 0.5) - h(k)) {
                return k;
            }
        }
    }
private:
    /** Clamp x to [min, max]. */
    template<typename T>
    static constexpr T clamp(const T x, const T min, const T max)
    {
        return std::max(min, std::min(max, x));
    }
    /** exp(x) - 1 / x */
    static double
    expxm1bx(const double x)
    {
        return (std::abs(x) > epsilon)
            ? std::expm1(x) / x
            : (1.0 + x/2.0 * (1.0 + x/3.0 * (1.0 + x/4.0)));
    }
    /** H(x) = log(x) if q == 1, (x^(1-q) - 1)/(1 - q) otherwise.
     * H(x) is an integral of h(x).
     *
     * Note the numerator is one less than in the paper order to work with all
     * positive q.
     */
    const RealType H(const RealType x)
    {
        const RealType log_x = std::log(x);
        return expxm1bx((1.0 - q) * log_x) * log_x;
    }
    /** log(1 + x) / x */
    static RealType
    log1pxbx(const RealType x)
    {
        return (std::abs(x) > epsilon)
            ? std::log1p(x) / x
            : 1.0 - x * ((1/2.0) - x * ((1/3.0) - x * (1/4.0)));
    }
    /** The inverse function of H(x) */
    const RealType H_inv(const RealType x)
    {
        const RealType t = std::max(-1.0, x * (1.0 - q));
        return std::exp(log1pxbx(t) * x);
    }
    /** That hat function h(x) = 1 / (x ^ q) */
    const RealType h(const RealType x)
    {
        return std::exp(-q * std::log(x));
    }
    static constexpr RealType epsilon = 1e-8;
    IntType                                  n;     ///< Number of elements
    RealType                                 q;     ///< Exponent
    RealType                                 H_x1;  ///< H(x_1)
    RealType                                 H_n;   ///< H(n)
    std::uniform_real_distribution<RealType> dist;  ///< [H(x_1), H(n)]
};

代码中的以下行对于每次对zipf()的调用执行n次:

sum_prob = sum_prob + c / pow((double) i, alpha);

令人遗憾的是,有必要调用pow()函数,因为在内部,该函数求和的不是一个而是两个泰勒级数[考虑到pow(x, alpha) == exp(alpha*log(x))]。当然,如果alpha是一个整数,那么用简单的乘法代替pow()可以大大加快代码的速度。如果alpha是一个有理数,那么你可以通过编码Newton-Raphson迭代来代替两个Taylor级数,从而将代码的速度提高到较小的程度。如果最后一个条件成立,请告知。

幸运的是,您已经表示alpha不会更改。你能不通过准备一张pow((double) i, alpha)的表,然后让zipf()在表中查找数字来加快代码的速度吗?这样,zipf()就根本不需要调用pow()。我怀疑这将节省大量时间。

然而,进一步的改进是可能的。如果将函数zipf()分解为函数sumprob(),会怎样?你能不能准备一个更激进的查找表供sumprob()使用?

也许其中的一些想法会让你朝着正确的方向前进。看看你不能用它们做什么。

更新:我发现您的问题现在修改后可能无法使用此答案。从现在的观点来看,你的问题可能会变成复变量理论中的一个问题。正如你所知,这些问题往往并不容易。可能是一个足够聪明的数学家发现了一个相关的递推关系或一些技巧,比如正态分布的Box-Muller技术,但如果是这样,我对该技术并不熟悉。祝你好运(这可能对你来说并不重要,但如果重要的话,已故的N.N。列别捷夫1972年出版的优秀著作《特殊函数及其应用》有俄文的英译本,价格低廉。如果你真的,真的想解决这个问题,你接下来可能会读列别捷夫——但当然,这是一个绝望的措施,不是吗?)

作为对上面给出的非常好的拒绝版本实现的补充,这里有一个C++类,具有相同的API,对于少数bin来说,它更简单、更快,只有。在我的机器上,N=300的速度大约快2.3倍。它更快,因为它执行直接的表查找,而不是计算日志和功率。不过,表占用了缓存。。。根据我的CPU d缓存的大小进行猜测,我想上面给出的正确的拒绝反转算法可能会在N=35K左右变得更快。此外,初始化该表需要对每个bin调用std::pow(),因此只有当您从中提取N个以上的值时,这才能赢得性能。否则,拒绝反转会更快。明智地选择。

(我已经建立了API,所以它看起来很像std::c++标准委员会可能提出的。)

/**
 * Example usage:
 *
 *    std::random_device rd;
 *    std::mt19937 gen(rd());
 *    zipf_table_distribution<> zipf(300);
 *
 *    for (int i = 0; i < 100; i++)
 *        printf("draw %d %dn", i, zipf(gen));
 */
template<class IntType = unsigned long, class RealType = double>
class zipf_table_distribution
{
   public:
      typedef IntType result_type;
      static_assert(std::numeric_limits<IntType>::is_integer, "");
      static_assert(!std::numeric_limits<RealType>::is_integer, "");
      /// zipf_table_distribution(N, s)
      /// Zipf distribution for `N` items, in the range `[1,N]` inclusive.
      /// The distribution follows the power-law 1/n^s with exponent `s`.
      /// This uses a table-lookup, and thus provides values more
      /// quickly than zipf_distribution. However, the table can take
      /// up a considerable amount of RAM, and initializing this table
      /// can consume significant time.
      zipf_table_distribution(const IntType n,
                              const RealType q=1.0) :
         _n(init(n,q)),
         _q(q),
         _dist(_pdf.begin(), _pdf.end())
      {}
      void reset() {}
      IntType operator()(std::mt19937& rng)
      {
         return _dist(rng);
      }
      /// Returns the parameter the distribution was constructed with.
      RealType s() const { return _q; }
      /// Returns the minimum value potentially generated by the distribution.
      result_type min() const { return 1; }
      /// Returns the maximum value potentially generated by the distribution.
      result_type max() const { return _n; }
   private:
      std::vector<RealType>               _pdf;  ///< Prob. distribution
      IntType                             _n;    ///< Number of elements
      RealType                            _q;    ///< Exponent
      std::discrete_distribution<IntType> _dist; ///< Draw generator
      /** Initialize the probability mass function */
      IntType init(const IntType n, const RealType q)
      {
         _pdf.reserve(n+1);
         _pdf.emplace_back(0.0);
         for (IntType i=1; i<=n; i++)
            _pdf.emplace_back(std::pow((double) i, -q));
         return n;
      }
};

这是一个比drobilla原始帖子快2倍的版本,此外它还支持非零变形参数q(又名Hurwicz q、q系列q或量子群变形q),并更改符号以符合数论教科书中的标准用法。经过严格测试;请参阅上的单元测试https://github.com/opencog/cogutil/blob/master/tests/util/zipfUTest.cxxtest

双重许可MIT许可证,或Gnu Affero,请根据需要复制到C++标准中。

/**
 * Zipf (Zeta) random distribution.
 *
 * Implementation taken from drobilla's May 24, 2017 answer to
 * https://stackoverflow.com/questions/9983239/how-to-generate-zipf-distributed-numbers-efficiently
 *
 * That code is referenced with this:
 * "Rejection-inversion to generate variates from monotone discrete
 * distributions", Wolfgang Hörmann and Gerhard Derflinger
 * ACM TOMACS 6.3 (1996): 169-184
 *
 * Note that the Hörmann & Derflinger paper, and the stackoverflow
 * code base incorrectly names the paramater as `q`, when they mean `s`.
 * Thier `q` has nothing to do with the q-series. The names in the code
 * below conform to conventions.
 *
 * Example usage:
 *
 *    std::random_device rd;
 *    std::mt19937 gen(rd());
 *    zipf_distribution<> zipf(300);
 *
 *    for (int i = 0; i < 100; i++)
 *        printf("draw %d %dn", i, zipf(gen));
 */
template<class IntType = unsigned long, class RealType = double>
class zipf_distribution
{
   public:
      typedef IntType result_type;
      static_assert(std::numeric_limits<IntType>::is_integer, "");
      static_assert(!std::numeric_limits<RealType>::is_integer, "");
      /// zipf_distribution(N, s, q)
      /// Zipf distribution for `N` items, in the range `[1,N]` inclusive.
      /// The distribution follows the power-law 1/(n+q)^s with exponent
      /// `s` and Hurwicz q-deformation `q`.
      zipf_distribution(const IntType n=std::numeric_limits<IntType>::max(),
                        const RealType s=1.0,
                        const RealType q=0.0)
         : n(n)
         , _s(s)
         , _q(q)
         , oms(1.0-s)
         , spole(abs(oms) < epsilon)
         , rvs(spole ? 0.0 : 1.0/oms)
         , H_x1(H(1.5) - h(1.0))
         , H_n(H(n + 0.5))
         , cut(1.0 - H_inv(H(1.5) - h(1.0)))
         , dist(H_x1, H_n)
      {
         if (-0.5 >= q)
            throw std::runtime_error("Range error: Parameter q must be greater than -0.5!");
      }
      void reset() {}
      IntType operator()(std::mt19937& rng)
      {
         while (true)
         {
            const RealType u = dist(rng);
            const RealType x = H_inv(u);
            const IntType  k = std::round(x);
            if (k - x <= cut) return k;
            if (u >= H(k + 0.5) - h(k))
               return k;
         }
      }
      /// Returns the parameter the distribution was constructed with.
      RealType s() const { return _s; }
      /// Returns the Hurwicz q-deformation parameter.
      RealType q() const { return _q; }
      /// Returns the minimum value potentially generated by the distribution.
      result_type min() const { return 1; }
      /// Returns the maximum value potentially generated by the distribution.
      result_type max() const { return n; }

   private:
      IntType    n;     ///< Number of elements
      RealType   _s;    ///< Exponent
      RealType   _q;    ///< Deformation
      RealType   oms;   ///< 1-s
      bool       spole; ///< true if s near 1.0
      RealType   rvs;   ///< 1/(1-s)
      RealType   H_x1;  ///< H(x_1)
      RealType   H_n;   ///< H(n)
      RealType   cut;   ///< rejection cut
      std::uniform_real_distribution<RealType> dist;  ///< [H(x_1), H(n)]
      // This provides 16 decimal places of precision,
      // i.e. good to (epsilon)^4 / 24 per expanions log, exp below.
      static constexpr RealType epsilon = 2e-5;
      /** (exp(x) - 1) / x */
      static double
      expxm1bx(const double x)
      {
         if (std::abs(x) > epsilon)
            return std::expm1(x) / x;
         return (1.0 + x/2.0 * (1.0 + x/3.0 * (1.0 + x/4.0)));
      }
      /** log(1 + x) / x */
      static RealType
      log1pxbx(const RealType x)
      {
         if (std::abs(x) > epsilon)
            return std::log1p(x) / x;
         return 1.0 - x * ((1/2.0) - x * ((1/3.0) - x * (1/4.0)));
      }
      /**
       * The hat function h(x) = 1/(x+q)^s
       */
      const RealType h(const RealType x)
      {
         return std::pow(x + _q, -_s);
      }
      /**
       * H(x) is an integral of h(x).
       *     H(x) = [(x+q)^(1-s) - (1+q)^(1-s)] / (1-s)
       * and if s==1 then
       *     H(x) = log(x+q) - log(1+q)
       *
       * Note that the numerator is one less than in the paper
       * order to work with all s. Unfortunately, the naive
       * implementation of the above hits numerical underflow
       * when q is larger than 10 or so, so we split into
       * different regimes.
       *
       * When q != 0, we shift back to what the paper defined:
       *    H(x) = (x+q)^{1-s} / (1-s)
       * and for q != 0 and also s==1, use
       *    H(x) = [exp{(1-s) log(x+q)} - 1] / (1-s)
       */
      const RealType H(const RealType x)
      {
         if (not spole)
            return std::pow(x + _q, oms) / oms;
         const RealType log_xpq = std::log(x + _q);
         return log_xpq * expxm1bx(oms * log_xpq);
      }
      /**
       * The inverse function of H(x).
       *    H^{-1}(y) = [(1-s)y + (1+q)^{1-s}]^{1/(1-s)} - q
       * Same convergence issues as above; two regimes.
       *
       * For s far away from 1.0 use the paper version
       *    H^{-1}(y) = -q + (y(1-s))^{1/(1-s)}
       */
      const RealType H_inv(const RealType y)
      {
         if (not spole)
            return std::pow(y * oms, rvs) - _q;
         return std::exp(y * log1pxbx(oms * y)) - _q;
      }
};

同时,还有一种基于拒绝反转采样的更快方法,请参阅此处的代码。