如何限制c++中运行实例的数量

How to limit the number of running instances in C++

本文关键字:实例 运行 何限制 c++      更新时间:2023-10-16

我有一个分配大量内存的c++类。它通过调用第三方库来实现这一点,该库被设计为在无法分配内存时崩溃,有时我的应用程序在并行线程中创建我的类的几个实例。线程太多,我崩溃了。对于解决方案,我的最佳想法是确保同时运行的实例永远不会超过三个。(这是个好主意吗?)我目前实现的最佳方法是使用升压互斥锁。下面的伪代码

MyClass::MyClass(){
  my_thread_number = -1; //this is a class variable
  while (my_thread_number == -1)
    for (int i=0; i < MAX_PROCESSES; i++)
      if(try_lock a mutex named i){
        my_thread_number = i;
        break;
      }
  //Now I know that my thread has mutex number i and it is allowed to run
}
MyClass::~MyClass(){
    release mutex named my_thread_number
}

如你所见,我不太确定这里的互斥体的确切语法。总结一下,我的问题是

  1. 当我想通过限制线程数量来解决我的内存错误时,我在正确的轨道上吗?
  2. 如果是,我应该用互斥锁还是用其他方法?
  3. 如果是,我的算法是否合理?
  4. 是否有一个很好的例子,如何使用try_lock与boost互斥锁?

编辑:我意识到我在谈论线程,而不是进程。编辑:我正在参与构建一个可以在linux和Windows上运行的应用程序…

UPDATE我的另一个答案是在线程之间调度资源(在问题澄清之后)。

它展示了一个信号量方法来协调(许多)工人之间的工作,以及一个thread_pool来首先限制工人并将工作排队。

在linux(也许还有其他操作系统?)上,您可以使用锁文件习惯用法(但某些文件系统和旧内核不支持)。

我建议使用进程间同步对象

。,使用Boost进程间命名信号量:

#include <boost/interprocess/sync/named_semaphore.hpp>
#include <boost/thread.hpp>
#include <cassert>
int main()
{
    using namespace boost::interprocess;
    named_semaphore sem(open_or_create, "ffed38bd-f0fc-4f79-8838-5301c328268c", 0ul);
    if (sem.try_wait())
    {
        std::cout << "Oops, second instancen";
    }
    else
    {
        sem.post();
        // feign hard work for 30s
        boost::this_thread::sleep_for(boost::chrono::seconds(30));
        if (sem.try_wait())
        {
            sem.remove("ffed38bd-f0fc-4f79-8838-5301c328268c");
        }
    }
}

如果你在后台启动一个副本,新的副本将"拒绝"启动("哎呀,第二个实例")大约30秒。

我有一种感觉,这里可能更容易颠倒逻辑。嗯。让我试一试。

一段时间过去了

呵呵。这比我想的更棘手。

问题是,您希望确保当应用程序被中断或终止时锁不会保留。为了分享可移植处理信号的技术:

#include <boost/interprocess/sync/named_semaphore.hpp>
#include <boost/thread.hpp>
#include <cassert>
#include <boost/asio.hpp>
#define MAX_PROCESS_INSTANCES 3
boost::interprocess::named_semaphore sem(
        boost::interprocess::open_or_create, 
        "4de7ddfe-2bd5-428f-b74d-080970f980be",
        MAX_PROCESS_INSTANCES);
// to handle signals:
boost::asio::io_service service;
boost::asio::signal_set sig(service);
int main()
{
    if (sem.try_wait())
    {
        sig.add(SIGINT);
        sig.add(SIGTERM);
        sig.add(SIGABRT);
        sig.async_wait([](boost::system::error_code,int sig){ 
                std::cerr << "Exiting with signal " << sig << "...n";
                sem.post();
            });
        boost::thread sig_listener([&] { service.run(); });
        boost::this_thread::sleep_for(boost::chrono::seconds(3));
        service.post([&] { sig.cancel(); });
        sig_listener.join();
    }
    else
    {
        std::cout << "More than " << MAX_PROCESS_INSTANCES << " instances not allowedn";
    }
}

这里有很多可以解释的。如果你感兴趣,请告诉我。

注意这应该是相当明显的,如果kill -9在你的应用程序中使用(强制终止),那么所有的赌注都取消了,你将不得不删除名称信号量对象或显式解锁它(post())。

下面是我的系统上的一个测试:

sehe@desktop:/tmp$ (for a in {1..6}; do ./test& done; time wait)
More than 3 instances not allowed
More than 3 instances not allowed
More than 3 instances not allowed
Exiting with signal 0...
Exiting with signal 0...
Exiting with signal 0...
real    0m3.005s
user    0m0.013s
sys 0m0.012s

这里有一个简单的方法来实现你自己的'信号量'(因为我认为标准库或boost没有)。这选择了一种"合作"的方式,工人将相互等待:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
void the_work(int id)
{
    static int running = 0;
    std::cout << "worker " << id << " entered (" << running << " running)n";
    static mutex mx;
    static condition_variable cv;
    // synchronize here, waiting until we can begin work
    {
        unique_lock<mutex> lk(mx);
        cv.wait(lk, phoenix::cref(running) < 3);
        running += 1;
    }
    std::cout << "worker " << id << " start workn";
    this_thread::sleep_for(chrono::seconds(2));
    std::cout << "worker " << id << " donen";
    // signal one other worker, if waiting
    {
        lock_guard<mutex> lk(mx);
        running -= 1;
        cv.notify_one(); 
    }
}
int main()
{
    thread_group pool;
    for (int i = 0; i < 10; ++i)
        pool.create_thread(bind(the_work, i));
    pool.join_all();
}

现在,我想说有一个n个工人轮流从队列中取出工作的专用池可能会更好:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;
      typedef function<void()> job_t;
      std::deque<job_t> _queue;
      thread_group pool;
      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (auto job = q.dequeue())
              (*job)();
      }
  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }
      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(std::move(job));
          cv.notify_one();
      }
      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;
          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
          if (_queue.empty())
              return none;
          auto job = std::move(_queue.front());
          _queue.pop_front();
          return std::move(job);
      }
      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }
          pool.join_all();
      }
};
void the_work(int id)
{
    std::cout << "worker " << id << " enteredn";
    // no more synchronization; the pool size determines max concurrency
    std::cout << "worker " << id << " start workn";
    this_thread::sleep_for(chrono::seconds(2));
    std::cout << "worker " << id << " donen";
}
int main()
{
    thread_pool pool; // uses 1 thread per core
    for (int i = 0; i < 10; ++i)
        pool.enqueue(bind(the_work, i));
}

p。如果您愿意,可以使用c++ 11 lambdas代替boost::phoenix