如何启动多个线程,每个线程处理不同的文件
How to launch multiple threads and each thread working on different files?
我有一个单线程应用程序,它通过调用send_new_file
向其他服务器发送文件
void send_new_file_command::start_sending_file()
{
m_thread = thread(&send_new_file_command::execute_file, this);
}
void send_new_file_command::execute_file()
{
for (auto it = files_need_to_send.begin(); it != files_need_to_send.end() && !is_complete(); ++it)
{
{
std::unique_lock<spinning_lock> guard(lock_obj);
m_current_file = *it;
}
// send a file.
// I want to call this in parallel
send_new_file(*it);
}
}
是否有任何方法我可以有多个线程和每个线程发送一个文件。举个例子,假设我们有4个线程,线程1,2,3,4将并行发送不同的文件。我想并行调用send_new_file
?
我使用std::thread
。我正在看线程的例子,我如何在c++中做到这一点,但困惑的是,我如何在这里划分每个线程的文件数量,并确保每个线程在文件的子集上工作。
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i)
threads.push_back(std::thread(send_new_file(*it)));
我的背景是Java,所以有点困惑如何在c++中使用std::thread。
这是使用工作队列的一种相当简单的方法。您可以将代码片段连接到一个自包含的程序中。我们将使用以下标准库头文件:
#include <fstream>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
首先,定义一个函数,该函数接受一个文件名并将其发送到它应该去的任何地方。我将通过简单地将其写入/dev/null
来模拟这一点。
void
send_file(const std::string& filename)
{
std::ifstream istr {};
std::ofstream ostr {};
std::string line {};
istr.exceptions(std::ifstream::badbit);
ostr.exceptions(std::ofstream::badbit);
istr.open(filename);
ostr.open("/dev/null");
while (std::getline(istr, line))
ostr << line << 'n';
}
接下来,定义一个函数,该函数接受一个指针,该指针指向仍然需要发送的文件的std::vector
,另一个指针指向std::mutex
,该指针应该保护该向量。我使用指针而不是引用,因为这允许我稍后创建std::thread
更简单。如果你不喜欢,你不必这样做。
int
send_files(std::vector<std::string> *const files_p, std::mutex *const mutex_p)
{
auto count = 0;
while (true)
{
std::string next {};
{
const std::unique_lock<std::mutex> lck {*mutex_p};
if (files_p->empty()) // nothing left to do
return count;
next = std::move(files_p->back());
files_p->pop_back();
}
send_file(next);
count += 1;
}
}
重要的是,在执行发送文件的实际工作时,我们不会持有锁。否则,我们将彻底扼杀并发性。在保持锁的同时,我还小心地不分配任何内存。通常,您将看到std::list
用作工作队列,std::condition_variable
用于在队列发生更改时发出信号。我张贴的代码显示这在另一个答案前一段时间。然而,在这个简单的例子中,队列只会被移除,所以std::vector
是一个完美的选择。
最后,我们在一个简单的程序中使用我们所拥有的,每个硬件并发单元创建一个线程,并要求这些线程发送命令行参数中指定的所有文件。注意,如上所述,这将以相反的顺序处理列表。但是,如果这对您来说是一个问题,那么更改它是微不足道的。
int
main(int argc, char * * argv)
{
const auto nthreads = std::thread::hardware_concurrency();
std::mutex mutex {};
std::vector<std::thread> threads {};
std::vector<std::string> files {};
files.reserve(argc - 1);
for (auto i = 1; i < argc; ++i)
files.push_back(argv[i]);
threads.reserve(nthreads);
for (auto t = 0U; t < nthreads; ++t)
threads.emplace_back(send_files, &files, &mutex);
for (auto t = 0U; t < nthreads; ++t)
threads[t].join();
}
第一种方法
有一个简单的解决方案:
- 你的类包含要处理的文件向量
- 只有一个线程通过函数
execute_file()
管理这个向量 - 这个函数创建任意数量的线程,每个线程处理一个文件
- 在结束时,所有线程被连接(强制)
代码应该是这样的:
struct send_new_file_command {
vector<string> files_need_to_send;
public:
send_new_file_command(vector<string> f) : files_need_to_send(f) {}
void execute_file();
};
void send_new_file_command::execute_file()
{
vector<thread> exec;
for(auto it = files_need_to_send.begin(); it != files_need_to_send.end(); ++it)
{
exec.push_back(thread(send_new_file, *it));
}
for(auto &e : exec)
e.join();
}
可以用下面的代码进行测试:
void send_new_file(string x) { // simulator
for(int i = 0; i<10; i++) {
cout << x << endl;
this_thread::sleep_for(chrono::milliseconds(500));
}
}
int main() {
vector<string>vs{"a", "b", "c", "d"};
send_new_file_command sfc(vs);
sfc.execute_file();
return 0;
}
这个解决方案非常简单。它有两个主要缺点:
- 它可能会启动比你的硬件可以管理更多的线程。因此,只有少数几个线程是真正并发运行的。线程上的
- 专用于一个文件。如果它是一个短文件,并且线程再次空闲,它将不会被重用。
还有很多其他的解决方案。例如:
这个的一个变体,将启动固定数量的线程,每个线程在文件向量中查找要处理的下一个项目,只要它准备好了。然后你需要引入强锁。
比起使用原始线程,你可以考虑未来,启动
std::async(std::launch::async, send_new_file, *it);
性能方面的最佳方法:
- 使用
std::atomic<int>
声明计数器变量 - 创建一个向量,数组,无论什么
- 为每个线程调用join
然后,线程的main函数访问共享计数器并自增,并将结果保存在循环中的局部变量中:
std::atomic<int> counter = 0;
for(int j = 0;j<4;j++)
{
threads.push_back(std::thread([&](){
for(int i; (i = counter++) < size;)//the counter variable must be atomic!
{
do_work(i);
}
}));
}
for(int j = 0;j<4;j++)
threads[i].join();
- 将更高的优先级设置为 boost::asio 线程处理进程
- C++ 使用 2 个容器进行线程处理
- 当线程处理不同的类时,应该在哪里声明条件变量、互斥对象
- 多线程处理中的静态成员变量
- Opencv cpp 使用多线程处理同一视频的不同部分
- 对象析构函数在多线程处理时不断被调用,但该对象并未超出范围
- 使用 wxWidgets 进行多线程处理时出现奇怪的行为
- 使用共享变量进行线程处理
- 通过多线程处理确定每个字符在文件中出现的次数
- 使用多线程处理的异步请求
- 多线程处理,同时保持部分序列
- 如何在类中进行 c++ 多线程处理(将线程引用保留为成员 var)
- 用管道在C++中创建调度队列/线程处理程序:FIFO溢出
- 用多个线程处理SIGTERM的正确方法
- 使用C++的递归线程处理会使资源暂时不可用
- SDL 带变量的多线程处理 -- 无法按预期工作
- 使用多线程处理对象数组 - 无效使用 void 表达式错误
- 如何使用"priority"进行多线程处理?
- 使用简单的过程进行慢速多线程处理
- C++11 使用共享对象的多线程处理