这是并行处理多个作业而不阻塞主线程的合适方法

Which is the suitable approach to handle multiple jobs in parallel without blocking the main thread

本文关键字:线程 方法 并行处理 作业      更新时间:2023-10-16

我有一个要求,即单个进程应并行处理多个作业。其中每个作业都应定期运行(例如,每 10 秒运行一次)。此外,主线程需要注意停止信号,收到后应停止所有线程并退出。

以下是我处理此要求的方法。

主线程将为每个作业创建线程并等待停止信号。每个线程负责处理循环机制。当主线程收到停止信号时,它会向线程发送停止信号。

在这种机制中,胎面将一直运行。所以我认为可能有更好的方法来解决这个问题。 也许就像,主线程将跟踪每个作业下一步应该执行的时间,并在需要时启动线程。线程将执行一些操作并在完成后退出。在这种情况下,线程不会一直运行。

我目前不知道如何实施替代方法。所以,我想知道以上哪种方法可能是好方法?另外,如果有任何其他更好的选择,请提出建议。

编辑[3月19日]:

也许我最初应该提到这一点,但在这里。

假设我有 2 个作业,则不需要同时运行这两个作业。例如,作业 1 应每 10 秒运行一次,作业 2 应每 20 秒运行一次。

此外,如果作业本身需要更多时间,则必须有一些机制来正确识别和处理它。也许跳过执行或等待上一个作业完成,然后重新开始。目前,这方面的要求并不明确。但是,我应该能够识别并处理这种情况。

这里有一些建议。

首先让主线程根据需要启动线程以完成任务。主线程在创建线程时会分离线程,然后完全忘记它。分离的线程在返回时将自行清洁。这可以轻松简化主线程,并消除跟踪和管理不同线程的开销。如果线程几乎完全独立并且永远不会遇到无限循环,则此方法有效。当主线程收到停止信号时,它只会退出,这反过来又会杀死进程,但是这取决于操作系统,所以请查看当main()退出时分离的线程会发生什么?。

您启动一个充当任务/线程管理器的线程。此对象/线程将根据需要创建新线程,并通过某些池化或线程跟踪正确等待它们。然后,主线程可以通过互斥防护标志轻松地向线程跟踪器发送消息以停止,在这种情况下,线程跟踪器只需等待其所有生成的线程,然后死亡。这比上述解决方案需要更多的开销,但它为您提供了有关哪些线程何时运行的更多信息。此外,您还可以更好地控制是否需要直接终止线程或在必要时如何向线程发送消息。此外,它还允许安全清理,根据您的操作系统,线程可能会因进程死亡而缩短。如果线程需要能够更轻松地接收消息,并且您希望确保线程即使在停止信号之后也能运行完成(想想 R/W 操作),则这样做会更好。

您也可以将主线程和线程/管理器混合为一个,但这会使主线程更加复杂,并且对于大多数通用场景不会显着减少开销。

看看下面的类,它以给定的时间间隔运行一个函数:

class CallbackTimer
{
public:
~CallbackTimer()
{
stop();
}
void start(std::chrono::milliseconds interval, std::function<void()> callback)
{
stop();
shouldQuit = false;
handle = std::async([=,callback=std::move(callback)]() {
while (!shouldQuit)
{
auto nextStart = std::chrono::steady_clock::now() + interval;
callback();
std::this_thread::sleep_until(nextStart);
}
});
}
void stop()
{
if (handle.valid())
{
shouldQuit = true;
handle.get();
}
}
private:
std::atomic_bool shouldQuit;
std::future<void> handle;
};

在 start() 上创建一个新线程,该线程以给定的间隔运行给定的"回调"。对于 10 秒的间隔,回调恰好每 10 秒调用一次,除非在下一次启动到期时它仍在工作。在这种情况下,它会立即再次运行。

stop() 设置退出标志并等待线程退出。主路由的这种非常简单的实现不会中断 shouldQuit-checks 的睡眠,因此 CallbackTimer 可能需要一个完整的间隔才能退出。

请注意,回调时间可能会略有偏差,因此,如果以相同的间隔启动其中两个回调时间,则它们可能不会在一段时间后同时运行。但是漂移应该是最小的,如果它很关键,你应该考虑另一种解决方案。

下面是一个使用示例:

int main()
{
CallbackTimer a;
a.start(std::chrono::seconds(1), []() { std::cout << "a" << std::endl; });
CallbackTimer b;
b.start(std::chrono::seconds(2), []() { std::cout << "b" << std::endl; });
std::this_thread::sleep_for(std::chrono::seconds(10));
a.stop();
b.stop();
return 0;
}

为了更快的 stop(),您可以实现一个 "beginStop()" 方法,该方法仅将 'shouldQuit' 标志设置为 true,并在所有实例上调用 "stop()" 之前为所有实例调用 this。这样,第二个实例的关闭就不会延迟到第一个实例的关闭完成。

如果您的主线程只需要管理线程,您可以每 10 秒启动一次新线程并等待它们完成。

下面的示例并行运行两个线程,等待它们完成,并在循环之前再等待一秒钟。(shouldQuit() 函数使用来自 Visual C++ 的编译器特定函数;也许你必须在那里插入自己的代码)

#include <future>
#include <iostream>
#include <conio.h>
#include <thread>
#include <chrono>
int a()
{
std::cout << 'a' << std::flush;
return 1;
}
int b()
{
std::cout << 'b' << std::flush;
return 2;
}
bool shouldQuit()
{
while (_kbhit())
{
if (_getch() == 27)
return true;
}
return false;
}
int main()
{
auto threadFunctions = { a, b };
while(!shouldQuit())
{
auto threadFutures = std::vector<std::future<int>>{};
// Run asynchronous tasks
for (auto& threadFunction: threadFunctions)
threadFutures.push_back(std::async(threadFunction));
// Wait for all tasks to complete
for (auto& threadFuture : threadFutures)
threadFuture.get();
// Wait a second
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return 0;
}

线程消耗的时间(在我们等待它们完成时)不是计时的一部分。对于运行时间最长的线程,线程调用之间的暂停时间为 1 秒,对于所有其他线程

,暂停至少为 1 秒。注意:每隔几秒钟运行一个新线程并不是最有效的执行方式。在大多数操作系统上,创建新线程是一项昂贵的操作。如果性能至关重要,则应重用问题中所述的现有线程。但是程序的整体复杂性会增加。

使用std::asyncstd::future具有正确处理异常的好处。如果线程引发异常,则会捕获此异常并将其存储在 std::future 中。它在调用 future.get() 的线程中被重新抛出。因此,如果您的线程可能会抛出异常,最好尝试包装您的 future.get() 调用。抓住。

future.get() 调用将调用线程与为其创建 future 的线程同步。如果线程已经完成,它只是返回返回值。如果线程仍在工作,则调用线程将被阻塞,直到线程完成。