如何在<N>不发生内存泄漏的情况下同时(线程安全)填充 c++11 std::map<std::string,std::bitset*>?

How to populate c++11 std::map<std::string,std::bitset<N>*> concurrently (thread-safe) without memory leakage?

本文关键字:std gt lt 安全 c++11 填充 bitset 线程 string map 内存      更新时间:2023-10-16

基本上,我需要从并发读取的数千个文件中填充数百万个键条目(或多或少为 5000 万个订单)填充std::map。这些键将指向的值将从堆(std::bitset类型)中分配。

std::map<std::string,std::bitset<BITSET_SIZE>*> my_map;
  1. 我的第一个问题是:我不想要两个线程(首先检查是否 键存在,如果不存在,则从堆中分配空间。 因为我只能持有一个指针,而其他分配会 导致内存泄漏,因为我无法跟踪它们。

    //count should be thread-safe, since it's defined as const in <map> header file
    if(my_map.count(key) == 0){
    //some other thread may have initialized the key in the mean time
    my_map[key] = new std::bitset<BITSET_SIZE>();
    //Now I will lose the pointer to previous heap allocation from other thread
    }
    

    一种解决方案是使用一些互斥机制,例如boost::unique_lockboost::shared_lock的某种智能组合 和 boost::unique_lock 为了性能,我很高兴听到你的想法。

  2. 想象我已经完成了第一部分,意思是;同时初始化my_map键而不会发生内存泄漏,任务的第二部分是同时操作值(std::bitset)。为此,我认为应该没有任何问题,因为根据我的设置,可以保证没有两个线程会同时处理同一个密钥。(任何线程都不会从my_map键的底层树结构中添加新键或删除键)

即使没有同步,conststd::容器(如map)的访问也保证从不同的线程是合法的。

任何没有同步的非const访问都会使任何其他访问(const或非const)成为非法的(程序行为变得未定义)。

有些操作const,但就同步而言是const。 例如,非常量find被视为"const",就像vector上的[]一样。

地图上的[]不会const,也不会被视为const。 我不确定不创建元素[]是否被视为const,我必须仔细检查标准。 由于find存在并使用定义良好的语义解决相同的问题,因此在任何情况下我都不会在代码中使用它。

const并不意味着线程安全,而是意味着其他const操作的线程安全。 线程安全是两个或多个代码位之间的关系,它不是绝对的。 因此,在其他人插入时调用.count是不合法的。

通常,共享是线程安全的祸根。 解决这个问题的更简单方法是为每个"任务"提供自己的工作map。 然后,将这些map合并回主地图。

合并的复杂程度和频率取决于具体的应用程序以及复制量。

最简单的解决方案是:

std::map<std::string, std::unique_ptr<std::bitset<BITSET_SIZE>>>
parse_file( some_file_handle );

然后

std::map<std::string, std::unique_ptr<std::bitset<BITSET_SIZE>>>
parse_files( gsl::span<some_file_handle> handles ) {
if (handles.size()==0) return {};
if (handles.size()==1) return parse_file(handles.front());
auto lhs = parse_files( handles.first(handles.size()/2) );
auto rhs = parse_files( handles.last(handles.size()-handles.size()/2) );
return merge_maps(std::move(lhs), std::move(rhs));
}

为我们提供了单线程版本。 我们通过以下方式进行多线程处理:

std::map<std::string, std::unique_ptr<std::bitset<BITSET_SIZE>>>
parse_files( gsl::span<some_file_handle> handles, executor exec ) {
if (handles.size()==0) return {};
if (handles.size()==1) return parse_file(handles.front());
auto lhs = exec( [handles]{parse_files(handles.first(handles.size()/2) )} );
auto rhs = exec( [handles]{parse_files(handles.last(handles.size()-handles.size()/2) )} );
auto retval = exec( [lhs=std::move(lhs], rhs=std::move[rhs]]()mutable{
return merge_maps(std::move(lhs).get(), std::move(rhs).get() );
}
return std::move(retval).get();
}

其中executoris 获取类型为T()的对象并返回一个future<T>。 天真的执行者只是运行函数并返回一个准备好的未来。 一个更高级的执行者使用std::async来线程化它。 更高级的线程使用线程池,并且在等待时使用等待线程来运行任务(如果它尚未运行)。

现在,像ppl或英特尔的TBB这样的并发库提供了非常容易地做到这一点的方法。

首先 - const 函数不是线程安全的。考虑:

struct A {
int q;
void set(int qq) { q = qq; }
int get() const { return q; }
};

get() 不是线程安全的 - 在另一个线程集上可能会被调用,这将修改 q。如果你想要线程安全,你必须使用原子结构锁定或更新(还有其他多线程问题,如果你不锁定/使用原子学就会发生这种情况,但这些超出了你的问题范围 - 你绝对需要其中任何一个!

现在要解决: 由于您需要显式同步对地图结构的访问,因此问题不再是问题:

std::mutex m; // since c++11
...
{
std::lock_guard _l(m); // since c++11
if (!my_map.emplace(key, bitset_ptr).second)
delete bitset_ptr;
}

这会将元素插入到具有和值bitset_ptr的my_map中,但前提是它不存在。它将返回两个元素的元组 - 第一个是创建元素和先前存在的元素的迭代器,第二个是布尔标志,如果元素已创建,则为 true,如果之前存在,则为 false。因此,如果元素已经输入并且没有内存泄漏,您只需删除bitset_ptr。请注意,由于同步量的原因,这可能会很慢。

更新: 显然,只要您在多个线程中不断更新,您就需要使用 mutex m 同步对my_map的任何访问。

UPDATE2: OP尝试了最简单的解决方案,但发现它不够快。让我们更深入。(注意:最优化的操作过程是测量应用程序的性能并找出代码花费大部分时间的位置,但我不能;)这样做)。减速的"明显"(阅读:可能)原因很少:

  • 插入到 map - 插入排序的映射具有 O(ln n) 运行时性能,实际上由于缓存不匹配,速度可能非常慢。 在 1 Milion 元素映射中,平均而言,您需要与 10 个不同的字符串进行比较,(可能)位于完全不同的内存区域中,从而始终从处理器缓存中相互清除。
  • 从文件中读取 - 从多个文件中读取小块可能对(或可能不会!)对整体速度不利。
  • 多个分配 - 通常内存分配很慢。此外,大量分配会增加内存碎片并减少局部性。
  • 锁定同步 - 这对任何事情都很健康......

我会假设,你不能轻易(便宜地)确定单个文件中和总数中的元素数量。首先是代码:

using etype = pair<string, bitset<N>*>;
vector<etype> all_elements;
mutex all_elements_mutex;
void parse_single_file_in_thread(...) {
vector<etype> tmp;
for(auto element : parse_element_from_file()) 
tmp.push_back(move(element));
lock_guard _l(all_elements_mutex);
for(auto &a : tmp) all_elements.push_back(move(a));
}
map<string, bitset<N>*> parse_all_files() {
// create threads, parse files in them and wait for them to finish
std::sort(all_elements.begin(), all_elements.end(), 
[](const etype &a, const etype &b) { return a.first < b.first; });
map<string, bitset<N>*> tmp;
for(auto &a : all_elements) if (!tmp.insert(tmp.end(), etype(move(a.first), a.second)).second) delete a.second;
all_elements.clear();
return tmp;
}

它所做的是几件事: - 首先将键插入矢量(忽略检查重复项),稍后这将被排序并插入到带有放置提示的地图中(它们被排序,所以我们总是知道插入下一个元素的正确位置 - 地图的末尾),这比直接插入地图要快得多 - 每个文件的项目首先放入它自己的向量中,并在解析整个文件后移动到全局向量中,这最大限度地减少了锁定

这应该足以提高性能。接下来是用其他东西替换字符串,以避免如此多的字符串到字符串排序比较。但这很容易超出范围。;)

注意:我已经从内存中编写了整个代码,因此它可能无法编译,并且可能需要c ++ 17。