如何在C++中提高标准::set_intersection性能

How to improve std::set_intersection performance in C++?

本文关键字:set intersection 性能 高标准 C++      更新时间:2023-10-16

在尝试 std::set in C++ 和 set () in Python 时,我遇到了无法解释的性能问题。在C++中设置交集至少比在 Python 中慢 3 倍。

那么有人可以指出我可以做的优化来C++代码和/或解释Python如何更快地做到这一点吗?

我希望它们都使用类似的算法,在集合排序时具有 O(n) 复杂性。但可能Python会做一些优化,所以它达到更小的系数。

set_bench.cc

#include <iostream>
#include <set>
#include <algorithm>
#include <iterator>
#include <chrono>
#include <functional>
#include <thread>
void elapsed(std::function<void()> f, const std::string& s)
{
auto start = std::chrono::steady_clock::now();
f();
std::chrono::duration<double> elapsed = std::chrono::steady_clock::now() - start;
std::cout << s << " " << elapsed.count() << " seconds" << std::endl;
}
template <typename T>
void fill_set(std::set<T>& s, T start, T end, T step)
{
for (T i = start; i < end; i += step) {
s.emplace(i);
}
}
template <typename T>
void intersect(const std::set<T>& s1, const std::set<T>& s2, std::set<T>& result)
{
std::set_intersection(s1.begin(), s1.end(),
s2.begin(), s2.end(),
std::inserter(result, result.begin()));
}
int main()
{
std::set<int64_t> s1;
std::set<int64_t> s2;
std::set<int64_t> s3;
elapsed(std::bind(fill_set<int64_t>, std::ref(s1), 8, 1000*1000*100, 13), "fill s1 took");
elapsed(std::bind(fill_set<int64_t>, std::ref(s2), 0, 1000*1000*100, 7), "fill s2 took");
std::cout << "s1 length = " << s1.size() << ", s2 length = " << s2.size() << std::endl;
elapsed(std::bind(intersect<int64_t>, std::ref(s1), std::ref(s2), std::ref(s3)), "intersect s1 and s2 took");
std::cout << "s3 length = " << s3.size() << std::endl;
// sleep to let check memory consumption
// while (true) std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

set_bench.py

#!/usr/bin/env python3
import time
def elapsed(f, s):
start = time.monotonic()
f()
elapsed = time.monotonic() - start
print(f'{s} {elapsed} seconds')
def fill_set(s, start, end, step=1):
for i in range(start, end, step):
s.add(i)
def intersect(s1, s2, result):
result.update(s1 & s2)
s1 = set()
s2 = set()
elapsed(lambda : fill_set(s1, 8, 1000*1000*100, 13), 'fill s1 took')
elapsed(lambda : fill_set(s2, 0, 1000*1000*100, 7), 'fill s2 took')
print(f's1 length = {len(s1)}, s2 length = {len(s2)}')

s3 = set()
elapsed(lambda: intersect(s1, s2, s3), 'intersect s1 and s2 took')
print(f's3 length = {len(s3)}')
# sleep to let check memory consumption
# while True: time.sleep(1)

以下是在下一个环境中运行此程序的结果:

  • CLANG 版本 7.0.1
  • 海湾合作委员会 8.2.0
  • 蟒蛇 3.7.2
  • i7-7700 处理器 @ 3.60GHz
$ clang -lstdc++ -O0 set_bench.cc -o set_bench && ./set_bench
fill s1 took 5.38646 seconds
fill s2 took 10.5762 seconds
s1 length = 7692308, s2 length = 14285715
intersect s1 and s2 took 1.48387 seconds
s3 length = 1098901
$ clang -lstdc++ -O1 set_bench.cc -o set_bench && ./set_bench
fill s1 took 3.31435 seconds
fill s2 took 6.41415 seconds
s1 length = 7692308, s2 length = 14285715
intersect s1 and s2 took 1.01276 seconds
s3 length = 1098901
$ clang -lstdc++ -O2 set_bench.cc -o set_bench && ./set_bench
fill s1 took 1.90269 seconds
fill s2 took 3.85651 seconds
s1 length = 7692308, s2 length = 14285715
intersect s1 and s2 took 0.512727 seconds
s3 length = 1098901
$ clang -lstdc++ -O3 set_bench.cc -o set_bench && ./set_bench
fill s1 took 1.92473 seconds
fill s2 took 3.72621 seconds
s1 length = 7692308, s2 length = 14285715
intersect s1 and s2 took 0.523683 seconds
s3 length = 1098901
$ gcc -lstdc++ -O3 set_bench.cc -o set_bench && time ./set_bench
fill s1 took 1.72481 seconds
fill s2 took 3.3846 seconds
s1 length = 7692308, s2 length = 14285715
intersect s1 and s2 took 0.516702 seconds
s3 length = 1098901
$ python3.7 ./set_bench.py 
fill s1 took 0.9404696229612455 seconds
fill s2 took 1.082577683031559 seconds
s1 length = 7692308, s2 length = 14285715
intersect s1 and s2 took 0.17995300807524472 seconds
s3 length = 1098901

如您所见,结果相等,因此我假设两个程序执行相同的计算。

顺便说一下 - C++程序的RSS是1084896 kB,Python 是1590400 kB。

这篇文章有两个问题:

问:如何提高C++std::set_intersection性能?

使用排序std::vector而不是集合,这样对缓存更友好。由于交叉点在一次通过中按顺序完成,因此它将尽可能快。 在我的系统上,我的运行时间为0.04 秒。 如果这就是您所需要的,请停在这里。

问:...Python是如何做得这么快的?

或者换句话说,">为什么Python的设置比C++集快?"。我将在文章的其余部分重点讨论这个问题。

首先,Python的set是一个哈希表,std::set是一个二叉树。因此,使用std::unordered_set将苹果与苹果进行比较(此时我们根据二叉树的 O(logN) 查找复杂性拒绝二叉树)。

另请注意,std::set_intersection只是一个双指针算法;它遍历两个排序集,只保留匹配的值。 除了它的名字,与Python的set_intersection没有任何共同之处,它本身只是一个简单的循环:

  • 迭代较小的哈希表
  • 对于每个元素,如果它存在于另一个哈希表中,请将其添加到结果中

所以我们不能对未排序的数据使用std::set_intersection,需要实现循环:

for (auto& v : set1) {
if (set2.find(v) != set2.end()) {
result.insert(v);
}
}

这里没有什么花哨的。不幸的是,尽管在std::unordered_set上直接应用此算法仍然了 3 倍。这怎么可能?

  1. 我们观察到输入数据集的大小> 100MB。这不适合 i7-7700 的 8MB 缓存,这意味着您在 8MB 的范围内可以容纳的工作越多,您的程序执行速度就越快。

  2. Python使用一种特殊形式的"密集哈希表",类似于PHP哈希表(通常是一类开放寻址哈希表),而C++std::unordered_set通常是朴素或列表向量哈希表。密集结构对缓存更友好,因此速度更快。有关实现的详细信息,请参阅 dictobject.c 和 setobject.c。

  3. 内置C++std::hash<long>对于您正在生成的已经唯一的输入数据集来说过于复杂。另一方面,Python 对最大 230的整数使用身份(无操作)哈希函数(见long_hash)。碰撞由内置于其哈希表实现中的 LCG 摊销。您无法将其与C++标准库功能相提并论;不幸的是,此处的身份哈希将再次导致过于稀疏的哈希表。

  4. Python 使用自定义内存分配器 pymalloc,它类似于 jemalloc,并针对数据局部性进行了优化。它通常优于内置的Linux tcmalloc,这是C++程序通常使用的。

有了这些知识,我们可以设计一个性能类似的C++版本,以证明技术可行性:

#include <iostream>
#include <unordered_set>
#include <algorithm>
#include <iterator>
#include <chrono>
#include <functional>
#include <thread>
#include <tuple>
#include <string>
using namespace std::chrono_literals;
void elapsed(std::function<void()> f, const std::string& s)
{
auto start = std::chrono::steady_clock::now();
f();
auto end = std::chrono::steady_clock::now();
std::cout << s << " " << (end - start) / 1.0s << " seconds" << std::endl;
}
template <typename T>
struct myhash {
size_t operator()(T x) const {
return x / 5; // cheating to improve data locality
}
};
template <typename T>
using myset = std::unordered_set<T, myhash<T>>;
template <typename T>
void fill_set(myset<T>& s, T start, T end, T step)
{
s.reserve((end - start) / step + 1);
for (T i = start; i < end; i += step) {
s.emplace(i);
}
}
template <typename T>
void intersect(const myset<T>& s1, const myset<T>& s2, myset<T>& result)
{
result.reserve(s1.size() / 4); // cheating to compete with a better memory allocator
for (auto& v : s1)
{
if (s2.find(v) != s2.end())
result.insert(v);
}
}
int main()
{
myset<int64_t> s1;
myset<int64_t> s2;
myset<int64_t> s3;
elapsed(std::bind(fill_set<int64_t>, std::ref(s1), 8, 1000 * 1000 * 100, 13), "fill s1 took");
elapsed(std::bind(fill_set<int64_t>, std::ref(s2), 0, 1000 * 1000 * 100, 7), "fill s2 took");
std::cout << "s1 length = " << s1.size() << ", s2 length = " << s2.size() << std::endl;
elapsed(std::bind(intersect<int64_t>, std::ref(s1), std::ref(s2), std::ref(s3)), "intersect s1 and s2 took");
std::cout << "s3 length = " << s3.size() << std::endl;
}

有了这段代码,我在C++和Python版本中都得到了0.28秒的运行时间。

现在,如果我们想击败Python 的设定性能,我们可以删除所有作弊并使用 Google 的dense_hash_set,它通过二次探测实现开放寻址,作为直接替代品(只需要调用set_empty_object(0))。

使用google::dense_hash_set和无操作哈希函数,我们得到:

fill s1 took 0.321397 seconds
fill s2 took 0.529518 seconds
s1 length = 7692308, s2 length = 14285714
intersect s1 and s2 took 0.0974416 seconds
s3 length = 1098901

或者比 Python 快 2.8 倍,同时保持哈希集功能!


附言有人会想 - 为什么C++标准库实现如此缓慢的哈希表? 无免费午餐定理在这里也适用:基于探测的解决方案并不总是很快;作为一种机会主义的解决方案,它有时会遭受"聚集"(无休止地探索占用的空间)。 当这种情况发生时,性能会呈指数级下降。 标准库实现背后的想法是保证所有可能的输入的可预测性能。不幸的是,尽管对现代硬件的缓存影响太大而不容忽视,正如钱德勒·卡鲁斯在他的演讲中解释的那样。

你不是在比较喜欢和喜欢。

Python 集是无序(哈希)集。std::set<>是一个有序集合(二叉树)。

来自 python 文档:

5.4. 集合 Python 还包括集合的数据类型。集合是没有重复元素的无序集合。基本用途包括成员资格测试和消除重复条目。集合对象还支持数学运算,如并集、交集、差分和对称差分。

重构以比较同类:

#include <iostream>
#include <unordered_set>
#include <algorithm>
#include <iterator>
#include <chrono>
#include <functional>
#include <thread>
#include <tuple>
void elapsed(std::function<void()> f, const std::string& s)
{
auto start = std::chrono::steady_clock::now();
f();
std::chrono::duration<double> elapsed = std::chrono::steady_clock::now() - start;
std::cout << s << " " << elapsed.count() << " seconds" << std::endl;
}
template <typename T>
void fill_set(std::unordered_set<T>& s, T start, T end, T step)
{
for (T i = start; i < end; i += step) {
s.emplace(i);
}
}
template <typename T>
void intersect(const std::unordered_set<T>& s1, const std::unordered_set<T>& s2, std::unordered_set<T>& result)
{
auto ordered_refs = [&]()
{
if (s1.size() <= s2.size())
return std::tie(s1, s2);
else
return std::tie(s2, s1);
};
auto lr = ordered_refs();
auto& l = std::get<0>(lr);
auto& r = std::get<1>(lr);
result.reserve(l.size());
for (auto& v : l)
{
if (auto i = r.find(v) ; i != r.end())
result.insert(v);
}
}
int main()
{
std::unordered_set<int64_t> s1;
std::unordered_set<int64_t> s2;
std::unordered_set<int64_t> s3;
elapsed(std::bind(fill_set<int64_t>, std::ref(s1), 8, 1000*1000*100, 13), "fill s1 took");
elapsed(std::bind(fill_set<int64_t>, std::ref(s2), 0, 1000*1000*100, 7), "fill s2 took");
std::cout << "s1 length = " << s1.size() << ", s2 length = " << s2.size() << std::endl;
elapsed(std::bind(intersect<int64_t>, std::ref(s1), std::ref(s2), std::ref(s3)), "intersect s1 and s2 took");
std::cout << "s3 length = " << s3.size() << std::endl;
// sleep to let check memory consumption
// while (true) std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

性能将取决于您的套件。

我怀疑您可以使用自定义分配器大大提高性能。默认的是一个线程安全的,等等。

话虽如此,在我的机器上,我只看到无序版本的加速了 20%。我冒昧地猜测 python 交叉代码已经过手动优化。

作为参考,python源代码在这里:https://github.com/python/cpython/blob/master/Objects/setobject.c

使用排序vector将远远超过set在此基准测试上的表现:

#include <iostream>
#include <vector>
#include <algorithm>
#include <iterator>
#include <chrono>
#include <functional>
#include <thread>
void elapsed(std::function<void()> f, const std::string& s)
{
auto start = std::chrono::steady_clock::now();
f();
std::chrono::duration<double> elapsed = std::chrono::steady_clock::now() - start;
std::cout << s << " " << elapsed.count() << " seconds" << std::endl;
}
template <typename T>
void fill_set(std::vector<T>& s, T start, T end, T step)
{
for (T i = start; i < end; i += step) {
s.emplace_back(i);
}
std::sort(s.begin(), s.end());
}
template <typename T>
void intersect(const std::vector<T>& s1, const std::vector<T>& s2, std::vector<T>& result)
{
std::set_intersection(s1.begin(), s1.end(),
s2.begin(), s2.end(),
std::inserter(result, result.begin()));
}
int main()
{
std::vector<int64_t> s1;
std::vector<int64_t> s2;
std::vector<int64_t> s3;
elapsed(std::bind(fill_set<int64_t>, std::ref(s1), 8, 1000*1000*100, 13), "fill s1 took");
elapsed(std::bind(fill_set<int64_t>, std::ref(s2), 0, 1000*1000*100, 7), "fill s2 took");
std::cout << "s1 length = " << s1.size() << ", s2 length = " << s2.size() << std::endl;
elapsed(std::bind(intersect<int64_t>, std::ref(s1), std::ref(s2), std::ref(s3)), "intersect s1 and s2 took");
std::cout << "s3 length = " << s3.size() << std::endl;
// sleep to let check memory consumption
// while (true) std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

对我来说(clang/libc++ -O3),结果来自:

fill s1 took 2.01944 seconds
fill s2 took 3.98959 seconds
s1 length = 7692308, s2 length = 14285715
intersect s1 and s2 took 1.55453 seconds
s3 length = 1098901

自:

fill s1 took 0.143026 seconds
fill s2 took 0.20209 seconds
s1 length = 7692308, s2 length = 14285715
intersect s1 and s2 took 0.0548819 seconds
s3 length = 1098901

这种性能差异的原因是vector版本中的分配要少得多。