如何检查std::线程是否仍在运行

How to check if a std::thread is still running?

本文关键字:线程 是否 运行 std 何检查 检查      更新时间:2023-10-16

如何检查std::thread是否仍在运行(以独立于平台的方式)?它缺少timed_join()方法,而joinable()并不适用于此。

我曾想过在线程中用std::lock_guard锁定互斥体,并使用互斥体的try_lock()方法来确定它是否仍被锁定(线程正在运行),但这对我来说似乎不必要地复杂

你知道更优雅的方法吗?

更新:需要明确的是:我想检查线程是否干净退出。"挂"线被认为是为此目的而运行的。

如果您愿意使用C++11 std::asyncstd::future来运行您的任务,那么您可以使用std::futurewait_for功能来检查线程是否仍在以这样的整洁方式运行:

#include <future>
#include <thread>
#include <chrono>
#include <iostream>
int main() {
    using namespace std::chrono_literals;
    /* Run some task on new thread. The launch policy std::launch::async
       makes sure that the task is run asynchronously on a new thread. */
    auto future = std::async(std::launch::async, [] {
        std::this_thread::sleep_for(3s);
        return 8;
    });
    // Use wait_for() with zero milliseconds to check thread status.
    auto status = future.wait_for(0ms);
    // Print status.
    if (status == std::future_status::ready) {
        std::cout << "Thread finished" << std::endl;
    } else {
        std::cout << "Thread still running" << std::endl;
    }
    auto result = future.get(); // Get result.
}

如果你必须使用std::thread,那么你可以使用std::promise来获得未来的对象:

#include <future>
#include <thread>
#include <chrono>
#include <iostream>
int main() {
    using namespace std::chrono_literals;
    // Create a promise and get its future.
    std::promise<bool> p;
    auto future = p.get_future();
    // Run some task on a new thread.
    std::thread t([&p] {
        std::this_thread::sleep_for(3s);
        p.set_value(true); // Is done atomically.
    });
    // Get thread status using wait_for as before.
    auto status = future.wait_for(0ms);
    // Print status.
    if (status == std::future_status::ready) {
        std::cout << "Thread finished" << std::endl;
    } else {
        std::cout << "Thread still running" << std::endl;
    }
    t.join(); // Join thread.
}

这两个示例都将输出:

Thread still running

这当然是因为线程状态是在任务完成之前检查的。

但话说回来,像其他人已经提到的那样做可能更简单:

#include <thread>
#include <atomic>
#include <chrono>
#include <iostream>
int main() {
    using namespace std::chrono_literals;
    std::atomic<bool> done(false); // Use an atomic flag.
    /* Run some task on a new thread.
       Make sure to set the done flag to true when finished. */
    std::thread t([&done] {
        std::this_thread::sleep_for(3s);
        done = true;
    });
    // Print status.
    if (done) {
        std::cout << "Thread finished" << std::endl;
    } else {
        std::cout << "Thread still running" << std::endl;
    }
    t.join(); // Join thread.
}

编辑:

还有std::packaged_task可与std::thread一起使用,以获得比使用std::promise更清洁的溶液:

#include <future>
#include <thread>
#include <chrono>
#include <iostream>
int main() {
    using namespace std::chrono_literals;
    // Create a packaged_task using some task and get its future.
    std::packaged_task<void()> task([] {
        std::this_thread::sleep_for(3s);
    });
    auto future = task.get_future();
    // Run task on new thread.
    std::thread t(std::move(task));
    // Get thread status using wait_for as before.
    auto status = future.wait_for(0ms);
    // Print status.
    if (status == std::future_status::ready) {
        // ...
    }
    t.join(); // Join thread.
}

一个简单的解决方案是有一个布尔变量,线程会定期将其设置为true,而想要了解状态的线程会检查该变量并将其设为false。如果变量在到long的时间内为false,则线程将不再被视为活动线程。

一种更安全的线程方式是使用子线程增加的计数器,主线程将计数器与存储的值进行比较,如果时间过长,则认为子线程未处于活动状态。

然而,请注意,在C++11中没有办法真正杀死或删除挂起的线程。

编辑如何检查线程是否干净退出:基本上与第一段中描述的技术相同;将布尔变量初始化为false。子线程所做的最后一件事就是将其设置为true。然后,主线程可以检查该变量,如果为true,则在没有太多(如果有的话)阻塞的情况下对子线程执行联接。

Edit2如果线程由于异常而退出,则有两个线程"主";函数:第一个有一个CCD_ 15-CCD_;真实的";主线程函数。这个第一个主要功能设置";have_exited";变量类似这样的东西:

std::atomic<bool> thread_done = false;
void *thread_function(void *arg)
{
    void *res = nullptr;
    try
    {
        res = real_thread_function(arg);
    }
    catch (...)
    {
    }
    thread_done = true;
    return res;
}

这个简单的机制可以用于检测线程的完成,而无需阻塞联接方法。

std::thread thread([&thread]() {
    sleep(3);
    thread.detach();
});
while(thread.joinable())
    sleep(1);

您可以随时检查线程的id是否与默认构造的std::thread::id()不同。正在运行的线程始终具有真正的关联id。尽量避免太多花哨的东西:)

创建一个运行线程和调用线程都可以访问的互斥体。当运行线程启动时,它锁定互斥体,当它结束时,它解锁互斥体。为了检查线程是否仍在运行,调用线程调用mutex.try_lock()。它的返回值是线程的状态。(如果try_lock有效,只需确保解锁互斥锁)

一个小问题是,在创建线程和锁定互斥体之间,互斥体.try_lock()将返回false,但使用稍微复杂一点的方法可以避免这种情况。

肯定有一个互斥体封装的变量初始化为false,线程在退出前最后一次将其设置为true。那原子能满足你的需要吗?

我检查了两个系统:-使用线程+原子:耗时9738毫秒-使用future+async:耗时7746毫秒非线程:56000毫秒使用核心-I7 6核心笔记本电脑

我的代码创建了4000个线程,但每次运行的线程不超过12个。

这是代码:

#include <iostream>
#include <thread>
#include <future>
#include <chrono>
#include <mutex>          // std::mutex
#include <atomic>
#include <chrono>

#pragma warning(disable:4996)
#pragma warning(disable:6031)
#pragma warning(disable:6387)//strout
#pragma warning(disable:26451)
using namespace std;
const bool FLAG_IMPRIME = false;
const int MAX_THREADS = 12;

mutex mtx;           // mutex for critical section
atomic <bool> th_end[MAX_THREADS];
atomic <int> tareas_acabadas;
typedef std::chrono::high_resolution_clock t_clock; //SOLO EN WINDOWS
std::chrono::time_point<t_clock> start_time, stop_time; char null_char;
void timer(const char* title = 0, int data_size = 1) { stop_time = t_clock::now(); double us = (double)chrono::duration_cast<chrono::microseconds>(stop_time - start_time).count(); if (title) printf("%s time = %7lgms = %7lg MOPsn", title, (double)us * 1e-3, (double)data_size / us); start_time = t_clock::now(); }

class c_trim
{
    char line[200];
    thread th[MAX_THREADS];
    double th_result[MAX_THREADS];
    int th_index;
    double milliseconds_commanded;
    void hilo(int hindex,int milliseconds, double& milliseconds2)
    {
        sprintf(line, "%i:%ia ",hindex, milliseconds); imprime(line);
        this_thread::sleep_for(std::chrono::milliseconds(milliseconds));
        milliseconds2 = milliseconds * 1000;
        sprintf(line, "%i:%ib ", hindex, milliseconds); imprime(line);
        tareas_acabadas++;  th_end[hindex] = true;
    }
    int wait_first();
    void imprime(char* str) { if (FLAG_IMPRIME) { mtx.lock(); cout << str; mtx.unlock(); } }
public:
    void lanzatareas();

    vector <future<void>> futures;
    int wait_first_future();
    void lanzatareas_future();//usa future
};
int main()
{
    c_trim trim;
    timer();
    trim.lanzatareas();
    cout << endl;
    timer("4000 tareas using THREAD+ATOMIC:", 4000);
    trim.lanzatareas_future();
    cout << endl;
    timer("4000 tareas using FUTURE:", 4000);
    cout << endl << "Tareas acabadas:" << tareas_acabadas << endl;
    cout << "=== END ===n"; (void)getchar();
}
void c_trim::lanzatareas()
{
    th_index = 0;
    tareas_acabadas = 0;
    milliseconds_commanded = 0;
    double *timeout=new double[MAX_THREADS];
    int i;
    for (i = 0; i < MAX_THREADS; i++)
    {
        th_end[i] = true;
        th_result[i] = timeout[i] = -1;
    }

    for (i = 0; i < 4000; i++)
    {
        int milliseconds = 5 + (i % 10) * 2;
        {
            int j = wait_first();
            if (th[j].joinable())
            {
                th[j].join();
                th_result[j] = timeout[j];
            }
            milliseconds_commanded += milliseconds;
            th_end[j] = false;
            th[j] = thread(&c_trim::hilo, this, j, milliseconds, std::ref(timeout[j]));
        }
    }
    for (int j = 0; j < MAX_THREADS; j++)
        if (th[j].joinable())
        {
            th[j].join();
            th_result[j] = timeout[j];
        }
    delete[] timeout;
    cout <<endl<< "Milliseconds commanded to wait=" << milliseconds_commanded << endl;
}
void c_trim::lanzatareas_future()
{
    futures.clear();
    futures.resize(MAX_THREADS);
    tareas_acabadas = 0;
    milliseconds_commanded = 0;
    double* timeout = new double[MAX_THREADS];
    int i;
    for (i = 0; i < MAX_THREADS; i++)
    {
        th_result[i] = timeout[i] = -1;
    }

    for (i = 0; i < 4000; i++)
    {
        int milliseconds = 5 + (i % 10) * 2;
        {
            int j;
            if (i < MAX_THREADS) j = i;
            else
            {
                j = wait_first_future();
                futures[j].get();
                th_result[j] = timeout[j];
            }
            milliseconds_commanded += milliseconds;
            futures[j] = std::async(std::launch::async, &c_trim::hilo, this, j, milliseconds, std::ref(timeout[j]));
        }
    }
    //Last MAX_THREADS:
    for (int j = 0; j < MAX_THREADS; j++)
    {
        futures[j].get();
        th_result[j] = timeout[j];
    }
    delete[] timeout;
    cout << endl << "Milliseconds commanded to wait=" << milliseconds_commanded << endl;
}
int c_trim::wait_first()
{
    int i;
    while (1)
        for (i = 0; i < MAX_THREADS; i++)
        {
            if (th_end[i] == true)
            {
                return i;
            }
        }
}

//Espera que acabe algun future y da su index
int c_trim::wait_first_future()
{
    int i;
    std::future_status status;
    while (1)
        for (i = 0; i < MAX_THREADS; i++)
        {
            status = futures[i].wait_for(0ms);
            if (status == std::future_status::ready)
                return i;
        }
}

我最近也遇到了这个问题。尝试使用C++20 std::jthread使用共享停止状态来检查线程是否结束,但在线程内部,std::stop_token参数是只读的,当线程结束时不会向外部指示。

因此,我创建了一个简单的类(nes::uthread),用一个标志来扩展std::thread,以指示它已经完成。示例:

#include <atomic>
#include <chrono>
#include <iostream>
#include <memory>
#include <thread>
namespace nes {
  class uthread final
  {
    std::unique_ptr<std::atomic<bool>> m_finished;
    std::thread m_thr;
  public:
    uthread()
      : m_finished { std::make_unique<std::atomic<bool>>(true) }
    {}
    template <class Function, class... Args>
    uthread(Function&& f, Args&&... args)
      : m_finished { std::make_unique<std::atomic<bool>>(false) }
      , m_thr {
        [](std::atomic<bool>& finished, Function&& ff, Args&&... aargs) {
          try {
            std::forward<Function>(ff)(std::forward<Args>(aargs)...);
            finished = true;
          } catch (...) {
            finished = true;
            throw;
          }
        }, 
        std::ref(*m_finished), std::forward<Function>(f), 
        std::forward<Args>(args)...
      }
    {}
    uthread(const uthread&) = delete;
    uthread(uthread&&) = default;
    uthread& operator=(const uthread&) = delete;
    uthread& operator=(uthread&&) = default;
    [[nodiscard]] std::thread::id get_id() const noexcept { 
      return m_thr.get_id(); }
    [[nodiscard]] bool joinable() const noexcept { return m_thr.joinable(); }
    void join() { m_thr.join(); }
    [[nodiscard]] const std::atomic<bool>& finished() const noexcept { 
      return *m_finished; }
  };
}
int main()
{
    using namespace std;
    using namespace std::chrono;
    using namespace std::chrono_literals;
    using namespace nes;
    {
      cout << "std::thread join() terminationn";
      atomic<bool> finished = false;
      thread t { [&finished] {
        this_thread::sleep_for(2s);
        finished = true;
        cout << "thread endedn";
      }};
      for (int i = 0; i < 5; i++) {
        cout << t.get_id() << ".join() " << t.joinable() 
             << " finished: " << finished << 'n';
        this_thread::sleep_for(1s);
      }
      t.join();
    }
    cout << 'n';
    {
      cout << "std::jthread join() terminationn";
      jthread t {[](stop_token st) {
        this_thread::sleep_for(2s);
        cout << "thread ended. stop possible: " << st.stop_possible() << 'n';
      }};
      auto st = t.get_stop_source();
      for (int i = 0; i < 5; i++) {
        cout << t.get_id() << ".join() " << t.joinable() 
             << " finished: " << !st.stop_possible() << 'n';
        this_thread::sleep_for(1s);
      }
    }
    cout << 'n';
    {
      cout << "nes::uthread join() terminationn";
      uthread t {[] {
        this_thread::sleep_for(2s);
        cout << "thread endedn";
      }};
      for (int i = 0; i < 5; i++) {
        cout << t.get_id() << ".join() " << t.joinable() 
             << " finished: " << t.finished() << 'n';
        this_thread::sleep_for(1s);
      }
      t.join();
    }
}

可能的打印:

std::thread join() termination
2.join() 1 finished: 0
2.join() 1 finished: 0
thread ended
2.join() 1 finished: 1
2.join() 1 finished: 1
2.join() 1 finished: 1
std::jthread join() termination
3.join() 1 finished: 0
3.join() 1 finished: 0
thread ended. stop possible: 1
3.join() 1 finished: 0
3.join() 1 finished: 0
3.join() 1 finished: 0
nes::uthread join() termination
4.join() 1 finished: 0
4.join() 1 finished: 0
thread ended
4.join() 1 finished: 1
4.join() 1 finished: 1
4.join() 1 finished: 1

您可以在nes::uthread中使用std::jthread,因此不需要加入。

我刚刚在这方面取得了突破。我一直在寻找一个简单通用的解决方案。给你。

使用Boost C++库,创建一个返回布尔值的信号。在第一次使用之前,将信号连接到返回false的插槽。在线程中创建一个返回trueboost::signals2::scoped_connection。这利用了boost::signals2::signal返回类型的默认行为,返回调用的最后一个插槽的值。通过在线程开始时使用scoped_connection,该插槽仅在线程运行时保持连接。此外,Boost signals2::signal包含维护线程安全的内部互斥。

#include <boost/signals2.hpp>
int main(int argc, char* argv[])
{
   //This can be in a class or someplace else
   boost::signals2::signal<bool ()> ThreadRunning;
   // Be sure to capture 'this' if used in a class
   ThreadRunning.connect([]() {return false;});
   
   auto t = std::thread([]()
   {
      boost::signals2::scoped_connection c(ThreadRunning.connect[]() {return true;}));
      // Do your stuff.
   });
   if (TreadRunning())
      //Do stuff if the thread is still running
   t.join();
   return 0;
}

虽然此方法是线程安全的,但线程有可能在调用信号和完成使用此逻辑的任何操作之间完成。