可能的 std::async 实现错误 Windows

possible std::async implementation bug Windows

本文关键字:实现 错误 Windows async std      更新时间:2023-10-16

似乎在std::async的Windows实现中存在一个错误。在重负载下(每秒异步启动 1000 个线程(,异步任务永远不会被调度,等待返回的未来会导致死锁。请参阅这段代码(使用延迟启动策略而不是异步进行修改(:

BundlingChunk(size_t numberOfInputs, Bundler* parent, ChunkIdType chunkId)
: m_numberOfInputs(numberOfInputs), m_parent(parent), m_chunkId(chunkId)
{
const BundlerChunkDescription& chunk = m_parent->m_chunks[m_chunkId];
const ChunkInfo& original = chunk.m_original;
auto& deserializers = m_parent->m_deserializers;
// Fetch all chunks in parallel.
std::vector<std::map<ChunkIdType, std::shared_future<ChunkPtr>>> chunks;
chunks.resize(chunk.m_secondaryChunks.size());
static std::atomic<unsigned long long int> chunksInProgress = 0;
for (size_t i = 0; i < chunk.m_secondaryChunks.size(); ++i)
{
for (const auto& c : chunk.m_secondaryChunks[i])
{
const auto chunkCreationLambda = ([this, c, i] {
chunksInProgress++;
ChunkPtr chunk = m_parent->m_weakChunkTable[i][c].lock();
if (chunk) {
chunksInProgress--;
return chunk;
}
chunksInProgress--;
return m_parent->m_deserializers[i]->GetChunk(c);
});
std::future<ChunkPtr> chunkCreateFuture = std::async(std::launch::deferred, chunkCreationLambda);
chunks[i].emplace(c, chunkCreateFuture.share());
}
}
std::vector<SequenceInfo> sequences;
sequences.reserve(original.m_numberOfSequences);
// Creating chunk mapping.
m_parent->m_primaryDeserializer->SequenceInfosForChunk(original.m_id, sequences);
ChunkPtr drivingChunk = chunks.front().find(original.m_id)->second.get();
m_sequenceToSequence.resize(deserializers.size() * sequences.size());
m_innerChunks.resize(deserializers.size() * sequences.size());
for (size_t sequenceIndex = 0; sequenceIndex < sequences.size(); ++sequenceIndex)
{
if (chunk.m_invalid.find(sequenceIndex) != chunk.m_invalid.end())
{
continue;
}
size_t currentIndex = sequenceIndex * deserializers.size();
m_sequenceToSequence[currentIndex] = sequences[sequenceIndex].m_indexInChunk;
m_innerChunks[currentIndex] = drivingChunk;
}
// Creating sequence mapping and requiring underlying chunks.
SequenceInfo s;
for (size_t deserializerIndex = 1; deserializerIndex < deserializers.size(); ++deserializerIndex)
{
auto& chunkTable = m_parent->m_weakChunkTable[deserializerIndex];
for (size_t sequenceIndex = 0; sequenceIndex < sequences.size(); ++sequenceIndex)
{
if (chunk.m_invalid.find(sequenceIndex) != chunk.m_invalid.end())
{
continue;
}
size_t currentIndex = sequenceIndex * deserializers.size() + deserializerIndex;
bool exists = deserializers[deserializerIndex]->GetSequenceInfo(sequences[sequenceIndex], s);
if (!exists)
{
if(m_parent->m_verbosity >= (int)TraceLevel::Warning)
fprintf(stderr, "Warning: sequence '%s' could not be found in the deserializer responsible for stream '%ls'n",
m_parent->m_corpus->IdToKey(sequences[sequenceIndex].m_key.m_sequence).c_str(),
deserializers[deserializerIndex]->StreamInfos().front().m_name.c_str());
m_sequenceToSequence[currentIndex] = SIZE_MAX;
continue;
}
m_sequenceToSequence[currentIndex] = s.m_indexInChunk;
ChunkPtr secondaryChunk = chunkTable[s.m_chunkId].lock();
if (!secondaryChunk)
{
secondaryChunk = chunks[deserializerIndex].find(s.m_chunkId)->second.get();
chunkTable[s.m_chunkId] = secondaryChunk;
}
m_innerChunks[currentIndex] = secondaryChunk;
}
}
}

我上面的版本经过修改,以便异步任务作为延迟而不是异步启动,从而解决了问题。从VS2017可再发行的14.12.25810开始,还有其他人见过这样的东西吗?重现此问题就像在具有 GPU 和 SSD 的计算机上使用文本和图像读取器的训练 CNTK 模型一样简单,因此 CPU 反序列化成为瓶颈。训练约30分钟后,通常会发生死锁。有没有人在 Linux 上看到过类似的问题?如果是这样,这可能是代码中的错误,尽管我对此表示怀疑,因为死锁后调试计数器chunksInProgress始终为 0。作为参考,整个源文件位于 https://github.com/Microsoft/CNTK/blob/455aef80eeff675c0f85c6e34a03cb73a4693bff/Source/Readers/ReaderLib/Bundler.cpp。

新的一天,更好的答案(好多了(。 请继续阅读。

我花了一些时间调查std::async在Windows上的行为,你是对的。 这是一种不同的动物,看这里。

因此,如果您的代码依赖于std::async总是启动一个新的执行线程并立即返回,那么您就无法使用它。 无论如何,不在Windows上。 在我的机器上,限制似乎是 768 个后台线程,这或多或少适合您观察到的内容。

无论如何,我想更多地了解现代C++所以我在推出自己的替代品时遇到了一个突破,可以std::async在 Windows 上使用 OP 取消语义。 因此,我谦卑地提出以下几点:

AsyncTask:std::async的直接替代品

#include <future>
#include <thread>
template <class Func, class... Args>
std::future <std::result_of_t <std::decay_t <Func> (std::decay_t <Args>...)>>
AsyncTask (Func&& f, Args&&... args)
{
using decay_func = std::decay_t <Func>;
using return_type = std::result_of_t <decay_func (std::decay_t <Args>...)>;
std::packaged_task <return_type (decay_func f, std::decay_t <Args>... args)>
task ([] (decay_func f, std::decay_t <Args>... args)
{
return f (args...);
});
auto task_future = task.get_future ();
std::thread t (std::move (task), f, std::forward <Args> (args)...);
t.detach ();
return task_future;
};

测试程序

#include <iostream>
#include <string>
int add_two_integers (int a, int b)
{
return a + b;
}
std::string append_to_string (const std::string& s)
{
return s + " addendum";
}
int main ()
{
auto /* i.e. std::future <int> */ f1 = AsyncTask (add_two_integers , 1, 2);
auto /* i.e. int */  i = f1.get ();
std::cout << "add_two_integers : " << i << std::endl;
auto  /* i.e. std::future <std::string> */ f2 = AsyncTask (append_to_string , "Hello world");
auto /* i.e. std::string */ s = f2.get ();        std::cout << "append_to_string : " << s << std::endl;
return 0;  
}

输出

add_two_integers : 3
append_to_string : Hello world addendum

现场演示在这里(gcc(和这里(叮当声(。

我从写这篇文章中学到了很多东西,这很有趣。 我对这个东西相当陌生,所以欢迎所有评论。如果我有什么错误,我很乐意更新这篇文章。

受到Paul Sander的回答的启发,我试图稍微简化他的代码:

#include <functional>
#include <future>
#include <thread>
#include <type_traits>
template <class Func, class... Args>
[[nodiscard]] std::future<std::invoke_result_t<std::decay_t<Func>, std::decay_t<Args>...>>
RunInThread(Func&& func, Args&&... args){
using return_type = std::invoke_result_t<std::decay_t<Func>, std::decay_t<Args>...>;
auto bound_func = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
std::packaged_task<return_type(void)> task(bound_func);
auto task_future = task.get_future();
std::thread(std::move(task)).detach();
return task_future;
}

不幸的是,这和保罗的实现都不符合标准。std::async 返回的未来不是一个正常的未来,而是一个特殊的未来:在任务完成之前,它不能被销毁,即它在其析构函数中调用任务线程的join()

除了std::async之外,任何人都无法访问std::future的此功能。因此,为了获得正确的行为,您必须自己实现这一点。我将所有这些的完整实现放在一个要点中,因为它相当冗长。