在不使用锁的情况下,危险指针如何安全地回收并发数据结构中的内存

How do Hazard Pointers safely reclaim memory in concurrent data structures without using locks?

本文关键字:安全 数据结构 并发 内存 何安全 情况下 指针 危险      更新时间:2023-10-16

我已经读过几次Maged Michael写的描述危险指针的文章,但我不太明白这里的窍门。

这个例子是一个映射,它做了一个写时复制——创建当前映射的新映射副本,插入一个新的键值对,并用新的键值对交换指向当前映射的指针。

因此,每个线程在执行写/交换后都保留一个线程本地map指针列表,该列表包含它想要丢弃(退休)的map指针。在更新结束时,它对退役指针列表进行扫描,找到每个没有通过"Acquire"方法分发给读线程的指针,并删除它。

我看不出Acquire是怎么做的。最初的地图是如何在危险指针列表中结束的?也许有一点代码丢失。即使它被引导到危险指针列表中,我们怎么能确定Acquire没有分发即将退役和删除的东西?

我拿了他的代码,并试图让它运行。它充满了错误。我想我把它修复得足够好,但我不确定我是否用boost::atomic正确地替换了他的CAS函数。

#pragma once
#include <iostream>
#include <unordered_map>
#include <vector>
#include <atomic>

typedef std::unordered_map<std::string, std::string> StringMap;

// Hazard pointer record
class HPRecType {
    //boost::atomic<int> active_;
    std::atomic_int active_;
    // Global header of the HP list
    static std::atomic<HPRecType *> pHead_;
    // The length of the list
    static std::atomic_int listLen_;
public:
    // Can be used by the thread
    // that acquired it
    void * pHazard_;
    HPRecType * pNext_;
    static HPRecType * Head() {
        return pHead_;
    }
    // Acquires one hazard pointer
    static HPRecType * Acquire() {
        int one = 1;
        // Try to reuse a retired HP record
        HPRecType * p = pHead_;
        for (; p; p = p->pNext_) {
            if (p->active_ || !p->active_.compare_exchange_strong(one, 0, std::memory_order_acquire))
                continue;
            // Got one!
            return p;
        }
        // Increment the list length
        int oldLen;
        do {
            oldLen = listLen_;
        } while (!listLen_.compare_exchange_strong(oldLen, oldLen +1, std::memory_order_acquire));
        // Allocate a new one
        p = new HPRecType;
        p->active_ = 1;
        p->pHazard_ = 0;
        // Push it to the front
        HPRecType * old;
        do {
            old = pHead_;
            p->pNext_ = old;
        } while (!pHead_.compare_exchange_strong(old, p, std::memory_order_acquire));
        return p;
    }
    // Releases a hazard pointer
    static void Release(HPRecType* p) {
        p->pHazard_ = 0;
        p->active_ = 0;
    }
};
// Per-thread private variable
//static boost::thread_specific_ptr<std::vector<StringMap*>> rlist;
static std::vector<StringMap*> *rlist;

class HazardPointerMap {
    std::atomic<StringMap *> pMap_;

private:
    static void Retire(StringMap * pOld) {
        // put it in the retired list
        rlist->push_back(pOld);
        if (rlist->size() >= 10) {
            Scan(HPRecType::Head());
        }
    }

    static void Scan(HPRecType * head) {
        // Stage 1: Scan hazard pointers list
        // collecting all non-null ptrs
        std::vector<void*> hp;
        while (head) {
            void * p = head->pHazard_;
            if (p) hp.push_back(p);
            head = head->pNext_;
        }
        // Stage 2: sort the hazard pointers
        sort(hp.begin(), hp.end(), std::less<void*>());
        // Stage 3: Search for'em!
        std::vector<StringMap *>::iterator i = rlist->begin();
        while (i != rlist->end()) {
            if (!binary_search(hp.begin(), hp.end(),  *i)) {
                // Aha!
                delete *i;
                i = rlist->erase(i);
                if (&*i != &rlist->back()) {
                    *i = rlist->back();
                }
                rlist->pop_back();
            } else {
                ++i;
            }
        }
    }
public:
    void Update(std::string&k, std::string&v){
        StringMap * pNew = 0;
        StringMap * pOld;
        do {
            pOld = pMap_;
            if (pNew) delete pNew;
            pNew = new StringMap(*pOld);
            (*pNew)[k] = v;
        } while (!pMap_.compare_exchange_strong(pOld, pNew, std::memory_order_acq_rel));
        Retire(pOld);
    }
    std::string Lookup(const std::string &k){
        HPRecType * pRec = HPRecType::Acquire();
        StringMap *ptr;
        do {
            ptr = pMap_;
            pRec->pHazard_ = ptr;
        } while (pMap_ != ptr);
        // Save Willy
        std::string result = ptr->at(k);
        // pRec can be released now
        // because it's not used anymore
        HPRecType::Release(pRec);
        return result;
    }
};

我看不出Acquire是怎么做的。初始地图是怎样的最后出现在危险指针列表上?也许有一些代码失踪。

Acquire()不绑定任何映射到危险指针列表。它只在列表中得到一个槽。在Acquire()之后,将映射绑定到Lookup()中该槽位的pHazard。注意下面的pRec->pHazard_ = ptr;

V Lookup(const K&k){
   HPRecType * pRec = HPRecType::Acquire();
   Map<K, V> * ptr;
   do {
      ptr = pMap_;
      pRec->pHazard_ = ptr;    //<---- Bind the map to slot here
   } while (pMap_ != ptr);
   // Save Willy
   V result = (*ptr)[k];
   // pRec can be released now
   // because it's not used anymore
   HPRecType::Release(pRec);
   return result;
}

即使它被引导到危险指针列表中,我们怎么能确保收购没有把即将得到的东西分发出去退休并删除?

不需要确定。从作者的角度来看,在Acquire()之后,没有任何有效的事情发生。pMap_上没有读取器,因为_pHazard还没有被读取器设置。请记住,写入器检查_pHazard只是为了确定是否有读取器。因此,只要reader的pRec->pHazard_ = ptr;没有执行,writer就可以自由地认为"哦,这个地图上没有reader",然后继续删除这个地图。即使writer删除了后面第2行和第3行之间的旧映射,循环也将保证reader不会从旧的(已删除的)映射中读取。

Line 1:    do {
Line 2:       ptr = pMap_;
Line 3:       pRec->pHazard_ = ptr;
Line 4:    } while (pMap_ != ptr);

另一个值得一提的是,active_仅在读取器之间使用,因此在单链表(共享危险指针列表)中不能有多个读取器获得相同的槽位。这可能会使您感到困惑,因为除了它之外还使用pHazard,两者都提供某种保护(在读取器之间,或读取器和写入器之间)。在我看来,active_可以移除,我们只需要检查pHazard是否为NULL。唯一的问题是,后者是一个可能是64位的指针,并且可能不是所有平台上都有64位CAS原语。