正在从并发析构函数停止boost::asio::io_service::run()

Stopping boost::asio::io_service::run() from concurrent destructor

本文关键字:io asio run service boost 并发 析构函数      更新时间:2023-10-16

有人能解释一下为什么这个程序没有终止吗?

#include <boost/asio/io_service.hpp>
#include <boost/asio.hpp>
#include <memory>
#include <cstdio>
#include <iostream>
#include <future>
class Service {
public:
    ~Service() {
        std::cout << "Destroying...n";
        io_service.post([this]() {
            std::cout << "clean and stopn"; // does not get called
            // do some cleanup
            // ...
            io_service.stop();
            std::cout << "Bye!n";
        });
        std::cout << "...destroyedn"; // last printed line, blocks
    }
    void operator()() {
        io_service.run();
        std::cout << "run completedn";
    }
private:
    boost::asio::io_service io_service;
    boost::asio::io_service::work work{io_service};
};
struct Test {
    void start() {
        f = std::async(std::launch::async, [this]() { service(); std::cout << "exiting threadn";});
    }
    std::future<void> f;
    Service service;
};
int main(int argc, char* argv[]) {
    {
        Test test;
        test.start();
        std::string exit;
        std::cin >> exit;
    }
    std::cout << "exiting programn"; // never printed
}
真正的问题是io_service的销毁(显然)不是线程安全的。

只需重置工作并加入线程。(可选)设置一个标志,使IO操作知道正在关闭。

测试和服务类试图分担IO服务的责任,但这不起作用。这里做了很多简化,合并了类并删除了未使用的future。

在Coliru上直播

诀窍是使work对象optional<>:

#include <boost/asio.hpp>
#include <boost/optional.hpp>
#include <iostream>
#include <thread>
struct Service {
    ~Service() {
        std::cout << "clean and stopn";
        io_service.post([this]() {
            work.reset(); // let io_service run out of work
        });
        if (worker.joinable())
            worker.join();
    }
    void start() {
        assert(!worker.joinable());
        worker = std::thread([this] { io_service.run(); std::cout << "exiting threadn";});
    }
private:
    boost::asio::io_service io_service;
    std::thread worker;
    boost::optional<boost::asio::io_service::work> work{io_service};
};
int main() {
    {
        Service test;
        test.start();
        std::cin.ignore(1024, 'n');
        std::cout << "Start shutdownn";
    }
    std::cout << "exiting programn"; // never printed
}

打印

Start shutdown
clean and stop
exiting thread
exiting program

请参阅此处:boost::asio在抛出io_service::run()后挂起在解析程序服务析构函数中

我认为这里的技巧是在调用io_service.stop()之前销毁worker(work成员)。即,在这种情况下,work可以是unique_ptr,并在停止服务之前显式调用reset()

EDIT:上面的内容在一段时间前对我的情况有所帮助,ioservice::stop没有停止,正在等待一些从未发生过的调度事件。

然而,我在我的机器上重现了您的问题,这似乎是ioservice内部的竞争条件,是ioservice::post()ioservice销毁代码(shutdown_service)之间的竞争。特别是,如果在post()通知唤醒另一个线程之前触发了shutdown_service(),则shutdown_service()代码将从队列中删除该操作(并"销毁"它而不是调用它),因此那时永远不会调用lambda。

目前,在我看来,您需要直接在析构函数中调用io_service.stop(),而不是通过post()延迟,因为由于种族原因,这显然在这里不起作用。

我能够通过如下重写您的代码来解决问题:

class Service {
public:
    ~Service() {
        std::cout << "Destroying...n";
        work.reset();
        std::cout << "...destroyedn"; // last printed line, blocks
    }
    void operator()() {
        io_service.run();
        std::cout << "run completedn";
    }
private:
    boost::asio::io_service io_service;
    std::unique_ptr<boost::asio::io_service::work> work = std::make_unique<boost::asio::io_service::work>(io_service);
};

然而,这在很大程度上是一种创可贴解决方案。

问题在于你的设计理念;特别是,选择不将执行线程的生存期直接与io_service对象绑定:

struct Test {
    void start() {
        f = std::async(std::launch::async, [this]() { service(); std::cout << "exiting threadn";});
    }
    std::future<void> f; //Constructed First, deleted last
    Service service; //Constructed second, deleted first
};

在这个特定的场景中,线程将继续尝试在io_service对象本身的生存期之后执行io_service.run()。如果在服务上执行的对象不止是基本的work对象,那么您很快就可以通过调用已删除对象的成员函数来处理未定义的行为。

您可以颠倒Test:中成员对象的顺序

struct Test {
    void start() {
        f = std::async(std::launch::async, [this]() { service(); std::cout << "exiting threadn";});
    }
    Service service;
    std::future<void> f;
};

但它仍然代表着一个重大的设计缺陷。

我通常实现任何使用io_service的东西的方法是将其生存期与实际要在其上执行的线程联系起来

class Service {
public:
    Service(size_t num_of_threads = 1) :
        work(std::make_unique<boost::asio::io_service::work>(io_service))
    {
        for (size_t thread_index = 0; thread_index < num_of_threads; thread_index++) {
            threads.emplace_back([this] {io_service.run(); });
        }
    }
    ~Service() {
        work.reset();
        for (std::thread & thread : threads) 
            thread.join();
    }
private:
    boost::asio::io_service io_service;
    std::unique_ptr<boost::asio::io_service::work> work;
    std::vector<std::thread> threads;
};

现在,如果在这些线程中的任何一个线程上有任何活动的无限循环,您仍然需要确保正确地清理这些循环,但至少该io_service操作的特定代码已正确清理。