如何在一次遍历中近似数组中不同值的计数

How to approximate the count of distinct values in an array in a single pass through it

本文关键字:数组 遍历 一次      更新时间:2023-10-16

我有几个巨大的数组(数百万++成员)。所有这些都是数字数组,它们没有排序(我不能这样做)。有些是uint8_t,有些则是uint16_t/32/64。我想近似计算这些数组中不同值的计数。条件如下:

  1. 速度非常重要,我需要在一次遍历数组时完成这一操作,并且必须按顺序遍历(不能来回跳跃)(如果这很重要的话,我在C++中执行)
  2. 我不需要精确的计数。我想知道的是,如果它是uint32_t数组,是否有10或20个不同的数字,或者是否有数千或数百万
  3. 我有相当多的记忆力可以用,但用得越少越好
  4. 数组数据类型越小,我就需要越准确
  5. 我不介意STL,但如果我能在没有它的情况下做到这一点,那就太好了(抱歉,没有BOOST)
  6. 如果该方法可以很容易地并行化,那将很酷(但这不是强制性条件)

完美输出示例:

ArrayA [uint32_t, 3M members]: ~128 distinct values
ArrayB [uint32_t, 9M members]: 100000+ distinct values
ArrayC [uint8_t, 50K members]: 2-5 distinct values
ArrayD [uint8_t, 700K members]: 64+ distinct values

我知道有些约束可能看起来不合逻辑,但事实就是这样。顺便说一句,我还想要最常用和最不常用的前X(3或10)值,但这要容易得多,我可以自己做。然而,如果有人对此也有想法,请随意分享!

编辑:关于STL的一些澄清。如果你有使用它的解决方案,请发布它。不使用STL对我们来说只是一个奖励,我们不太喜欢它。然而,如果它是一个好的解决方案,它将被使用!

对于8位和16位值,您只需制作每个值的计数表;每次写入以前为零的表条目时,都会发现不同的值。

对于较大的值,如果您对100000以上的计数不感兴趣,如果std::map足够快,则它是合适的。如果这对你来说太慢了,你可以自己编程B树。

我很确定你可以通过以下方式完成:

  1. 创建布隆过滤器
  2. 运行数组,将每个元素插入到过滤器中(这是一个"缓慢"的O(n),因为它需要计算每个值的几个独立的适当散列)
  3. 计算Bloom Filter中设置的位数
  4. 从滤波器的密度计算回不同值的数量的估计。我不知道这个计算是怎么回事,但对Bloom滤波器理论的任何处理都会涉及到这一点,因为它对滤波器在查找中给出假阳性的概率至关重要

假设你同时计算前10个最频繁的值,那么如果不到10个不同的值,你就会确切地知道它们是什么,而不需要估计。

我认为"最常用"的问题是困难的(嗯,消耗内存)。假设您只想要前1个最常用的值。进一步假设数组中有1000万个条目,在前990万个条目之后,到目前为止,您看到的数字中没有一个出现次数超过10万次。那么到目前为止你看到的任何值都可能是最常用的值,因为它们中的任何一个最终都可能有100k个值。更糟糕的是,他们中的任何两个在比赛结束时都可能各跑5万,在这种情况下,前990万个参赛作品的数量是他们之间的平局。因此,为了在一次通行证中计算出最常用的通行证,我认为你需要知道990万中出现的每个值的确切计数。你必须为过去10万中两个值几乎相等的反常情况做好准备,因为如果发生这种情况,你就不允许再次倒带和检查两个相关值。最终,您可以开始剔除值——如果有一个计数为5000的值,只剩下4000个条目需要检查,那么您可以剔除任何计数为1000或更少的值。但这并没有多大帮助。

所以我可能遗漏了一些内容,但我认为在最坏的情况下,"最频繁使用"的问题要求您对所看到的每个值都进行计数,直到接近数组的末尾。因此,您不妨使用该计数集合来计算有多少不同的值。

一种可行的方法是将它们分散到延迟分配的存储桶中,即使对于大值也是如此。

假设您使用的是32位整数,创建一个2**32位的数组是相对不切实际的(2**29字节,嗯)。然而,我们可以假设2**16指针仍然是合理的(2**19字节:500kB),因此我们创建2**16桶(空指针)。

因此,大的想法是采用"稀疏"的方法进行计数,并希望整数不会分散,因此许多桶指针将保持null

typedef std::pair<int32_t, int32_t> Pair;
typedef std::vector<Pair> Bucket;
typedef std::vector<Bucket*> Vector;
struct Comparator {
  bool operator()(Pair const& left, Pair const& right) const {
    return left.first < right.first;
  }
};
void add(Bucket& v, int32_t value) {
  Pair const pair(value, 1);
  Vector::iterator it = std::lower_bound(v.begin(), v.end(), pair, Compare());
  if (it == v.end() or it->first > value) {
    v.insert(it, pair);
    return;
  }
  it->second += 1;
}
void gather(Vector& v, int32_t const* begin, int32_t const* end) {
  for (; begin != end; ++begin) {
    uint16_t const index = *begin >> 16;
    Bucket*& bucket = v[index];
    if (bucket == 0) { bucket = new Bucket(); }
    add(*bucket, *begin);
  }
}

一旦你收集了数据,你就可以很容易地计算不同值的数量,或者找到顶部或底部。

几个注意事项:

  • bucket的数量是完全可自定义的(因此可以控制原始内存的数量)
  • 重新分区的策略也是可定制的(这只是我在这里制作的一个便宜的哈希表)
  • 可以监控分配的铲斗数量,如果开始爆炸,可以放弃或切换档位
  • 如果每个值都不同,那么它就是不起作用,但当你意识到这一点时,你已经收集了很多计数,所以你至少可以给出不同值的数量的下限,a你也有一个顶部/底部的起点

如果你设法收集了这些统计数据,那么这项工作就不适合你了。

对于8位和16位,很明显,您可以在每次迭代中跟踪每种可能性。

当你得到32位和64位整数时,你并没有足够的内存来跟踪每一种可能性。

以下是一些可能超出您限制范围的自然建议。

我真的不明白你为什么不能对数组进行排序。RadixSort是O(n),一旦排序,就需要再通过一次才能获得准确的区别性和前X信息。事实上,如果使用1字节基数(1次计数+1×4次每个字节+1次获取值),那么32位总共需要6次通过。

在与上述相同的情况下,为什么不直接使用SQL呢。您可以创建一个存储过程,将数组作为表值参数,并一次性返回不同值的数量和前x个值。这个存储过程也可以并行调用

-- number of distinct
SELECT COUNT(DISTINCT(n)) FROM @tmp
-- top x
SELECT TOP 10 n, COUNT(n) FROM @tmp GROUP BY n ORDER BY COUNT(n) DESC

我刚刚想到了一个有趣的解决方案。它基于布尔代数定律,称为乘法的幂等性,该定律指出:

X*X=X

由此,利用布尔乘法的交换性质,我们可以推导出:

X*Y*X=X*Y=X*Y

现在,你知道我要去哪里了吗?这就是算法的工作方式(我对伪代码很糟糕):

  1. 使c=element1&element2(整数的二进制表示之间的二进制AND)

  2. 对于i=3,直到i==size_of_array使b=c&元素[i];如果b!=c则different_values++;c=b;

在第一次迭代中,我们生成(element1*element2)*element3。我们可以将其表示为:

(X*Y)*Z

如果Z(元素3)等于X(元素1),则:

(X*Y)*Z=X*Y*X=X*Y

如果Z等于Y(element2),则:

(X*Y)*Z=X*Y*Y=X*Y

因此,如果Z与X或Y没有不同,那么当我们将其乘以Z 时,X*Y不会改变

这对于大型表达式仍然有效,例如:

(X*A*Z*G*T*p*S)*S=X*A*Z*G*T*p*S

如果我们收到一个值,它是我们的大被乘数的因子(这意味着它已经被计算过了),那么当我们将其乘以收到的输入时,大被乘数不会改变,所以没有新的不同值。

这就是它的发展方向。每次计算不同的值时,我们的大被乘数和该不同值的乘积将与大操作数不同。所以,对于b = c & element[i],如果b!= c,我们只增加不同值计数器。

我想我还不够清楚。如果是这样的话,请告诉我。