当对的数量非常巨大时,如何替换(或加速)并行比较

How to replace (or speed up) the parallel comparison when the number of pairs are extremely huge?

本文关键字:替换 比较 并行 加速 何替换 非常 巨大      更新时间:2023-10-16

下面的代码,并行比较,运行时间永远,当映射中的键的数量是巨大的(e。g100000)和它的每一个第二元素都有一个巨大的元素(e)(10万英镑)。

有没有可能加快比较的速度?我的cpu是Xeon E5450 3.00G 4核。Ram就够了。

// There is a map with long as its key and vector<long> as second element, 
//     the vector's elements are increasing sorted.
map<long, vector<long> > = aMap() ;
map<long, vector<long> >::iterator it1 = aMap.begin() ;
map<long, vector<long> >::iterator it2; 
// the code need compare each key's second elements 
for( ; it1 != aMap.end(); it1++ ) {
  it2 = it1; 
  it2++;
  // Parallel comparsion: THE MOST TIME CONSUMING PART
  for( ; it2 != aMap.end(); it2++ ) {
    unsigned long i = 0, j = 0, _union = 0, _inter = 0 ;
    while( i < it1->second.size() && j < it2->second.size() ) {
      if( it1->second[i] < it2->second[j] ) {
        i++; 
      } else if( it1->second[i] > it2->second[j] ) {
        j++; 
      } else {
        i++; j++; _inter++;
      }
    }
    _union = it1->second.size() + it2->second.size() - _inter;
    if ( (double) _inter / _union > THRESH )
      cout << it1->first << " might appears frequently with " << it2->first << endl;
  }
}

(这不是一个完整的答案。它只能解决你的部分问题;关于位操作的部分

这是一个类,你可以用它来计算两个集合之间的交集数量(交集的基数)。

它使用一个位向量来存储集合,这意味着可能的集合成员的全域必须很小。

#include <cassert>
#include <vector>
class BitVector
{
    // IMPORTANT: U must be unsigned
    // IMPORTANT: use unsigned long long in 64-bit builds.
    typedef unsigned long U;
    static const unsigned UBits = 8 * sizeof(U);
public:
    BitVector (unsigned size)
        : m_bits ((size + UBits - 1) / UBits, 0)
        , m_size (size)
    {
    }
    void set (unsigned bit)
    {
        assert (bit < m_size);
        m_bits[bit / UBits] |= (U)1 << (bit % UBits);
    }
    void clear (unsigned bit)
    {
        assert (bit < m_size);
        m_bits[bit / UBits] &= ~((U)1 << (bit % UBits));
    }
    unsigned countIntersect (BitVector const & that) const
    {
        assert (m_size == that.m_size);
        unsigned ret = 0;
        for (std::vector<U>::const_iterator i = m_bits.cbegin(),
             j = that.m_bits.cbegin(), e = m_bits.cend(), f = that.m_bits.cend();
             i != e && j != f; ++i, ++j)
        {
            U x = *i & *j;
            // Count the number of 1 bits in x and add it to ret
            // There are much better ways than this,
            // e.g. using the POPCNT instruction or intrinsic
            while (x != 0)
            {
                ret += x & 1;
                x >>= 1;
            }
        }
        return ret;
    }
    unsigned countUnion (BitVector const & that) const
    {
        assert (m_size == that.m_size);
        unsigned ret = 0;
        for (std::vector<U>::const_iterator i = m_bits.cbegin(),
             j = that.m_bits.cbegin(), e = m_bits.cend(), f = that.m_bits.cend();
             i != e && j != f; ++i, ++j)
        {
            U x = *i | *j;
            while (x != 0)
            {
                ret += x & 1;
                x >>= 1;
            }
        }
        return ret;
    }
private:
    std::vector<U> m_bits;
    unsigned m_size;
};

下面是一个非常小的测试程序,看看如何使用上面的类。它生成两个集合(每个集合有100K个最大元素),向其中添加一些元素(使用set()成员函数),然后计算它们的交集10亿次。它在我的机器上运行不到两秒钟。

#include <iostream>
using namespace std;
int main ()
{
    unsigned const SetSize = 100000;
    BitVector a (SetSize), b (SetSize);
    for (unsigned i = 0; i < SetSize; i += 2) a.set (i);
    for (unsigned i = 0; i < SetSize; i += 3) b.set (i);
    
    unsigned x = a.countIntersect (b);
    cout << x << endl;
    return 0;
}

不要忘记在编译时启用了优化!否则,它的性能非常差。

POPCNT

现代处理器有一条指令来计算一个字中设置位的数量,称为POPCNT。这比上面写的简单的方法要快得多(顺便说一句,在软件中也有更快的方法,但我不想污染代码。)

无论如何,在C/c++代码中使用POPCNT的方法是使用编译器固有的内置的。在MSVC中,您可以使用在32位整数上工作的__popcount()。在GCC中,您可以将__builtin_popcountl()用于32位整数,将__builtin_popcountll()用于64位整数。请注意,由于编译器版本、目标体系结构和/或编译开关的原因,这些函数可能不可用。

也许你想试试PPL。或者它的一些类似物。我真的不明白你的代码应该做什么,因为它似乎没有任何输出。但是没有副作用的代码可以通过Parallel Patterns Library等工具有效地并行化。