标准::映射插入/擦除的并发问题
Concurrency issue with std::map insert/erase
我正在编写一个线程应用程序,该应用程序将处理资源列表,并且可能会也可能不会将生成的项放置在每个资源的容器(std::map(中。资源的处理在多个线程中进行。
结果容器将被遍历,每个项目由一个单独的线程操作,该线程接受一个项目并更新MySQL数据库(使用mysqlcppconn API(,然后从容器中删除该项目并继续。
为了简单起见,以下是逻辑的概述:
queueWorker() - thread
getResourcesList() - seeds the global queue
databaseWorker() - thread
commitProcessedResources() - commits results to a database every n seconds
processResources() - thread x <# of processor cores>
processResource()
queueResultItem()
以及伪实现来显示我在做什么。
/* not the actual stucts, but just for simplicities sake */
struct queue_item_t {
int id;
string hash;
string text;
};
struct result_item_t {
string hash; // hexadecimal sha1 digest
int state;
}
std::map< string, queue_item_t > queue;
std::map< string, result_item_t > results;
bool processResource (queue_item_t *item)
{
result_item_t result;
if (some_stuff_that_doesnt_apply_to_all_resources)
{
result.hash = item->hash;
result.state = 1;
/* PROBLEM IS HERE */
queueResultItem(result);
}
}
void commitProcessedResources ()
{
pthread_mutex_lock(&resultQueueMutex);
// this can take a while since there
for (std::map< string, result_item_t >::iterator it = results.begin; it != results.end();)
{
// do mysql stuff that takes a while
results.erase(it++);
}
pthread_mutex_unlock(&resultQueueMutex);
}
void queueResultItem (result_item_t result)
{
pthread_mutex_lock(&resultQueueMutex);
results.insert(make_pair(result.hash, result));
pthread_mutex_unlock(&resultQueueMutex);
}
如 processResource(( 所示,问题就在那里,当 commitProcessingResources(( 正在运行并且 resultQueueMutex 被锁定时,我们将在这里等待 queueResultItem(( 返回,因为它会尝试锁定相同的互斥锁,因此会等到它完成,这可能需要一段时间。
显然,由于运行线程的数量有限,一旦所有线程都在等待 queueResultItem(( 完成,在互斥锁被释放并可用于 queueResultItem(( 之前,不会再做更多的工作。
所以,我的问题是我如何最好地实施这一点?是否有一种特定类型的标准容器可以同时插入和删除,或者是否存在我不知道的东西?
每个队列项都不必像 std::map 那样具有自己的唯一键,但我更喜欢它,因为多个资源可以产生相同的结果,我宁愿只向数据库发送唯一结果,即使它确实使用 INSERT IGNORE 忽略任何重复项。
我对C++相当陌生,所以不幸的是,我不知道在谷歌上寻找什么。 :(
在 commitProcessedResources ()
中处理期间,您不必一直保持队列的锁。您可以改为将队列与空队列交换:
void commitProcessedResources ()
{
std::map< string, result_item_t > queue2;
pthread_mutex_lock(&resultQueueMutex);
// XXX Do a quick swap.
queue2.swap (results);
pthread_mutex_unlock(&resultQueueMutex);
// this can take a while since there
for (std::map< string, result_item_t >::iterator it = queue2.begin();
it != queue2.end();)
{
// do mysql stuff that takes a while
// XXX You do not need this.
//results.erase(it++);
}
}
您需要使用同步方法(即互斥锁(才能正常工作。但是,并行编程的目标是最小化关键部分(即在按住锁时执行的代码量(。
也就是说,如果您的MySQL查询可以在不同步的情况下并行运行(即多个调用不会相互冲突(,请将它们从关键部分中删除。这将大大减少开销。例如,如下所示的简单重构可以解决问题
void commitProcessedResources ()
{
// MOVING THIS LOCK
// this can take a while since there
pthread_mutex_lock(&resultQueueMutex);
std::map<string, result_item_t>::iterator end = results.end();
std::map<string, result_item_t>::iterator begin = results.begin();
pthread_mutex_unlock(&resultQueueMutex);
for (std::map< string, result_item_t >::iterator it = begin; it != end;)
{
// do mysql stuff that takes a while
pthread_mutex_lock(&resultQueueMutex); // Is this the only place we need it?
// This is a MUCH smaller critical section
results.erase(it++);
pthread_mutex_unlock(&resultQueueMutex); // Unlock or everything will block until end of loop
}
// MOVED UNLOCK
}
这将使您能够跨多个线程并发"实时"访问数据。也就是说,每次写入完成后,地图都会更新,并且可以在其他地方读取当前信息。
直到 C++03,该标准根本没有定义任何关于线程或线程安全的内容(而且由于您使用的是 pthread
s,我猜这几乎就是您正在使用的(。
因此,您可以对共享地图进行锁定,以确保在任何给定时间只有一个线程尝试访问地图。否则,您可能会破坏其内部数据结构,因此地图不再有效。
或者(我通常更喜欢这样(你可以让你的多线程只是把它们的数据放到一个线程安全的队列中,并有一个线程从该队列中获取数据并将其放入映射中。由于它是单线程的,因此在使用地图时不再需要锁定地图。
在将映射刷新到磁盘时,有一些合理的可能性来处理延迟。最简单的方法可能是从队列中读取相同的线程,插入到映射中,并定期将映射刷新到磁盘。在这种情况下,当地图刷新到磁盘时,传入的数据只是位于队列中。这使得对地图的访问变得简单 - 因为只有一个线程直接接触它,它可以在没有任何锁定的情况下使用地图。
另一种方法是有两张地图。在任何给定时间,刷新到磁盘的线程都会获得一个映射,而从队列中检索并插入到映射中的线程将获得另一个映射。当刷新线程需要做它的事情时,它只是交换两者的角色。 就我个人而言,我认为我更喜欢第一种 - 消除地图周围的所有锁定具有很大的吸引力,至少对我来说是这样。
另一种保持简单性的变体是让队列>map线程创建map,填充它,当它足够满时(即,在适当的时间长度之后(将其填充到另一个队列中,然后从头开始重复(即,创建新map等(。刷新线程从其传入队列中检索映射,将其刷新到磁盘,然后销毁它。虽然这会增加创建和销毁地图的开销,但您这样做的频率不够高,无法关心很多。您仍然可以随时保持对任何映射的单线程访问,并且仍然将所有数据库访问与其他所有内容隔离。
- 警告处理为错误这里有什么问题
- 最小硬币更换问题(自上而下方法)
- 为"adjacent"变量赋值时出现问题
- 我的神经网络不起作用 [XOR 问题]
- 在Ubuntu 16.04上安装Cilk时出现问题
- C++我的数学有什么问题,为什么我的代码不能正确循环
- 编译包含字符串的代码时遇到问题
- Project Euler问题4的错误解决方案
- 问题:什么是QAbstractItemView::NoEditTriggers的反面
- 在编译C++代码(具有dlib和opencv)到WASM时面临问题
- 节俭并发:未解决的外部问题
- 提升 asio 并发计时器取消问题与链
- 并发问题:如何只有一个线程通过关键部分
- 以编程方式锁定注册表项以避免并发问题
- 如何理解C++并发操作中同类实例导致死锁的问题
- 多线程编写器:使用 cpp 的并发问题
- 标准::映射插入/擦除的并发问题
- 在c++和POSIX中使用数组解决并发问题
- c++并发、同步设计,避免多次执行问题
- 提振.线程上的并发性问题