如何启动多个线程,每个线程处理不同的文件

How to launch multiple threads and each thread working on different files?

本文关键字:线程 处理 文件 何启动 启动      更新时间:2023-10-16

我有一个单线程应用程序,它通过调用send_new_file向其他服务器发送文件

void send_new_file_command::start_sending_file()
{
    m_thread = thread(&send_new_file_command::execute_file, this);
}
void send_new_file_command::execute_file()
{
    for (auto it = files_need_to_send.begin(); it != files_need_to_send.end() && !is_complete(); ++it)
    {
        {
            std::unique_lock<spinning_lock> guard(lock_obj);
            m_current_file = *it;
        }
        // send a file.
        // I want to call this in parallel
        send_new_file(*it);
    }
}

是否有任何方法我可以有多个线程和每个线程发送一个文件。举个例子,假设我们有4个线程,线程1,2,3,4将并行发送不同的文件。我想并行调用send_new_file ?

我使用std::thread。我正在看线程的例子,我如何在c++中做到这一点,但困惑的是,我如何在这里划分每个线程的文件数量,并确保每个线程在文件的子集上工作。

  std::vector<std::thread> threads;
  for (int i = 0; i < 4; ++i)
    threads.push_back(std::thread(send_new_file(*it)));

我的背景是Java,所以有点困惑如何在c++中使用std::thread。

这是使用工作队列的一种相当简单的方法。您可以将代码片段连接到一个自包含的程序中。我们将使用以下标准库头文件:

#include <fstream>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

首先,定义一个函数,该函数接受一个文件名并将其发送到它应该去的任何地方。我将通过简单地将其写入/dev/null来模拟这一点。

void
send_file(const std::string& filename)
{
  std::ifstream istr {};
  std::ofstream ostr {};
  std::string line {};
  istr.exceptions(std::ifstream::badbit);
  ostr.exceptions(std::ofstream::badbit);
  istr.open(filename);
  ostr.open("/dev/null");
  while (std::getline(istr, line))
    ostr << line << 'n';
}

接下来,定义一个函数,该函数接受一个指针,该指针指向仍然需要发送的文件的std::vector,另一个指针指向std::mutex,该指针应该保护该向量。我使用指针而不是引用,因为这允许我稍后创建std::thread更简单。如果你不喜欢,你不必这样做。

int
send_files(std::vector<std::string> *const files_p, std::mutex *const mutex_p)
{
  auto count = 0;
  while (true)
    {
      std::string next {};
      {
        const std::unique_lock<std::mutex> lck {*mutex_p};
        if (files_p->empty())  // nothing left to do
          return count;
        next = std::move(files_p->back());
        files_p->pop_back();
      }
      send_file(next);
      count += 1;
    }
}
重要的是,在执行发送文件的实际工作时,我们不会持有锁。否则,我们将彻底扼杀并发性。在保持锁的同时,我还小心地不分配任何内存。通常,您将看到std::list用作工作队列,std::condition_variable用于在队列发生更改时发出信号。我张贴的代码显示这在另一个答案前一段时间。然而,在这个简单的例子中,队列只会被移除,所以std::vector是一个完美的选择。

最后,我们在一个简单的程序中使用我们所拥有的,每个硬件并发单元创建一个线程,并要求这些线程发送命令行参数中指定的所有文件。注意,如上所述,这将以相反的顺序处理列表。但是,如果这对您来说是一个问题,那么更改它是微不足道的。

int
main(int argc, char * * argv)
{
  const auto nthreads = std::thread::hardware_concurrency();
  std::mutex mutex {};
  std::vector<std::thread> threads {};
  std::vector<std::string> files {};
  files.reserve(argc - 1);
  for (auto i = 1; i < argc; ++i)
    files.push_back(argv[i]);
  threads.reserve(nthreads);
  for (auto t = 0U; t < nthreads; ++t)
    threads.emplace_back(send_files, &files, &mutex);
  for (auto t = 0U; t < nthreads; ++t)
    threads[t].join();
}

第一种方法

有一个简单的解决方案:

  • 你的类包含要处理的文件向量
  • 只有一个线程通过函数execute_file()
  • 管理这个向量
  • 这个函数创建任意数量的线程,每个线程处理一个文件
  • 在结束时,所有线程被连接(强制)

代码应该是这样的:

struct send_new_file_command {
    vector<string> files_need_to_send;
public:
    send_new_file_command(vector<string> f) : files_need_to_send(f) {}
    void execute_file();
};
void send_new_file_command::execute_file()
{
    vector<thread> exec;
    for(auto it = files_need_to_send.begin(); it != files_need_to_send.end(); ++it)
    {
        exec.push_back(thread(send_new_file, *it));
    }
    for(auto &e : exec)
        e.join();
}

可以用下面的代码进行测试:

void send_new_file(string x) { // simulator 
    for(int i = 0; i<10; i++) {
        cout << x << endl;
        this_thread::sleep_for(chrono::milliseconds(500));
    }
}
int main() {
    vector<string>vs{"a", "b", "c", "d"};
    send_new_file_command sfc(vs);
    sfc.execute_file();
    return 0;
}

这个解决方案非常简单。它有两个主要缺点:

    它可能会启动比你的硬件可以管理更多的线程。因此,只有少数几个线程是真正并发运行的。线程上的
  • 专用于一个文件。如果它是一个短文件,并且线程再次空闲,它将不会被重用。
<标题> 其他解决方案

还有很多其他的解决方案。例如:

  • 这个的一个变体,将启动固定数量的线程,每个线程在文件向量中查找要处理的下一个项目,只要它准备好了。然后你需要引入强锁。

  • 比起使用原始线程,你可以考虑未来,启动std::async(std::launch::async, send_new_file, *it);

性能方面的最佳方法:

  1. 使用std::atomic<int>
  2. 声明计数器变量
  3. 创建一个向量,数组,无论什么
  4. 为每个线程调用join

然后,线程的main函数访问共享计数器并自增,并将结果保存在循环中的局部变量中:

std::atomic<int> counter = 0;
for(int j = 0;j<4;j++)
{
    threads.push_back(std::thread([&](){
        for(int i; (i = counter++) < size;)//the counter variable must be atomic!
        {
            do_work(i);
        }
    }));
}
for(int j = 0;j<4;j++)
    threads[i].join();