标准::映射插入/擦除的并发问题

Concurrency issue with std::map insert/erase

本文关键字:并发 问题 擦除 映射 插入 标准      更新时间:2023-10-16

我正在编写一个线程应用程序,该应用程序将处理资源列表,并且可能会也可能不会将生成的项放置在每个资源的容器(std::map(中。资源的处理在多个线程中进行。

结果容器将被遍历,每个项目由一个单独的线程操作,该线程接受一个项目并更新MySQL数据库(使用mysqlcppconn API(,然后从容器中删除该项目并继续。

为了简单起见,以下是逻辑的概述:

queueWorker() - thread
    getResourcesList() - seeds the global queue
databaseWorker() - thread
    commitProcessedResources() - commits results to a database every n seconds
processResources() - thread x <# of processor cores>
    processResource()
    queueResultItem()

以及伪实现来显示我在做什么。

/* not the actual stucts, but just for simplicities sake */
struct queue_item_t {
    int id;
    string hash;
    string text;
};
struct result_item_t {
    string hash; // hexadecimal sha1 digest
    int state;
}
std::map< string, queue_item_t > queue;
std::map< string, result_item_t > results;
bool processResource (queue_item_t *item)
{
    result_item_t result;
    if (some_stuff_that_doesnt_apply_to_all_resources)
    {
        result.hash = item->hash;
        result.state = 1;
        /* PROBLEM IS HERE */
        queueResultItem(result);
    }
}
void commitProcessedResources ()
{
    pthread_mutex_lock(&resultQueueMutex);
    // this can take a while since there
    for (std::map< string, result_item_t >::iterator it = results.begin; it != results.end();)
    {
        // do mysql stuff that takes a while
        results.erase(it++);
    }
    pthread_mutex_unlock(&resultQueueMutex);
}
void queueResultItem (result_item_t result)
{
    pthread_mutex_lock(&resultQueueMutex);
    results.insert(make_pair(result.hash, result));
    pthread_mutex_unlock(&resultQueueMutex);
}

如 processResource(( 所示,问题就在那里,当 commitProcessingResources(( 正在运行并且 resultQueueMutex 被锁定时,我们将在这里等待 queueResultItem(( 返回,因为它会尝试锁定相同的互斥锁,因此会等到它完成,这可能需要一段时间。

显然,由于运行线程的数量有限,一旦所有线程都在等待 queueResultItem(( 完成,在互斥锁被释放并可用于 queueResultItem(( 之前,不会再做更多的工作。

所以,我的问题是我如何最好地实施这一点?是否有一种特定类型的标准容器可以同时插入和删除,或者是否存在我不知道的东西?

每个队列项都不必像 std::map 那样具有自己的唯一键,但我更喜欢它,因为多个资源可以产生相同的结果,我宁愿只向数据库发送唯一结果,即使它确实使用 INSERT IGNORE 忽略任何重复项。

我对C++相当陌生,所以不幸的是,我不知道在谷歌上寻找什么。 :(

commitProcessedResources () 中处理期间,您不必一直保持队列的锁。您可以改为将队列与空队列交换:

void commitProcessedResources ()
{
    std::map< string, result_item_t > queue2;
    pthread_mutex_lock(&resultQueueMutex);
    // XXX Do a quick swap.
    queue2.swap (results);
    pthread_mutex_unlock(&resultQueueMutex);
    // this can take a while since there
    for (std::map< string, result_item_t >::iterator it = queue2.begin();
        it != queue2.end();)
    {
        // do mysql stuff that takes a while
        // XXX You do not need this.
        //results.erase(it++);
    }   
}

您需要使用同步方法(即互斥锁(才能正常工作。但是,并行编程的目标是最小化关键部分(即在按住锁时执行的代码量(。

也就是说,如果您的MySQL查询可以在不同步的情况下并行运行(即多个调用不会相互冲突(,请将它们从关键部分中删除。这将大大减少开销。例如,如下所示的简单重构可以解决问题

void commitProcessedResources ()
{
    // MOVING THIS LOCK
    // this can take a while since there
    pthread_mutex_lock(&resultQueueMutex);
    std::map<string, result_item_t>::iterator end = results.end();
    std::map<string, result_item_t>::iterator begin = results.begin();
    pthread_mutex_unlock(&resultQueueMutex);
    for (std::map< string, result_item_t >::iterator it = begin; it != end;)
    {
        // do mysql stuff that takes a while
        pthread_mutex_lock(&resultQueueMutex); // Is this the only place we need it?
        // This is a MUCH smaller critical section
        results.erase(it++);
        pthread_mutex_unlock(&resultQueueMutex); // Unlock or everything will block until end of loop
    }
    // MOVED UNLOCK
}

这将使您能够跨多个线程并发"实时"访问数据。也就是说,每次写入完成后,地图都会更新,并且可以在其他地方读取当前信息。

直到 C++03,该标准根本没有定义任何关于线程或线程安全的内容(而且由于您使用的是 pthread s,我猜这几乎就是您正在使用的(。

因此,您可以对共享地图进行锁定,以确保在任何给定时间只有一个线程尝试访问地图。否则,您可能会破坏其内部数据结构,因此地图不再有效。

或者(我通常更喜欢这样(你可以让你的多线程只是把它们的数据放到一个线程安全的队列中,并有一个线程从该队列中获取数据并将其放入映射中。由于它是单线程的,因此在使用地图时不再需要锁定地图。

在将

映射刷新到磁盘时,有一些合理的可能性来处理延迟。最简单的方法可能是从队列中读取相同的线程,插入到映射中,并定期将映射刷新到磁盘。在这种情况下,当地图刷新到磁盘时,传入的数据只是位于队列中。这使得对地图的访问变得简单 - 因为只有一个线程直接接触它,它可以在没有任何锁定的情况下使用地图。

另一种方法是有两张地图。在任何给定时间,刷新到磁盘的线程都会获得一个映射,而从队列中检索并插入到映射中的线程将获得另一个映射。当刷新线程需要做它的事情时,它只是交换两者的角色。 就我个人而言,我认为我更喜欢第一种 - 消除地图周围的所有锁定具有很大的吸引力,至少对我来说是这样。

另一种保持简单性的变体是让队列>map线程创建map,填充它,当它足够满时(即,在适当的时间长度之后(将其填充到另一个队列中,然后从头开始重复(即,创建新map等(。刷新线程从其传入队列中检索映射,将其刷新到磁盘,然后销毁它。虽然这会增加创建和销毁地图的开销,但您这样做的频率不够高,无法关心很多。您仍然可以随时保持对任何映射的单线程访问,并且仍然将所有数据库访问与其他所有内容隔离。