C++的有效字数统计多线程

Effective wordcount multithreading in C++

本文关键字:多线程 统计 有效字数 C++      更新时间:2023-10-16

我对C++和多线程相当陌生,需要一些帮助来创建字数统计,以有效地在多个线程之间划分工作。

假设,我有一个函数来计算一行(字符串(中的单词:

count_words_in_line(line);

对于一个线程,行中的总字数是每行此函数输出的简单总和,但是如何将其划分为线程?

我的想法是使用两个线程 - 一个用于计算偶数行,一个用于计算奇数行,但代码会导致分段错误。

我做错了什么?有没有更好的方法?

我不想使用线程池,理想情况下,我想指定参数中的线程数,以衡量多线程实现的性能。

这是我的相关代码:

bool odd = true;
auto thread_count_odd = [&counter, &infile, &odd, &line, &mutex]() {
    while (std::getline(infile, line)) {
        if (odd) {
            std::cout<<"Count odd"<<std::endl;
            mutex.lock();
            counter += count_words_in_line(line);
            mutex.unlock();
        }
        odd = !odd;
    }
};
bool even = false;
auto thread_count_even = [&counter, &infile, &even, &line, &mutex]() {
    while (std::getline(infile, line)) {
        if (even) {
            std::cout<<"Count even"<<std::endl;
            mutex.lock();
            counter += count_words_in_line(line);
            mutex.unlock();
        }
        even = !even;
    }
};
std::thread t1(thread_count_odd);
std::thread t2(thread_count_even);
t1.join();
t2.join();
  • 使用具有读索引/写索引和互斥/条件变量的共享向量(如 Jarod42 所建议的那样(。
  • 启动计数线程,等待写入索引大于读取索引。
  • 让主线程读取行并填充向量并相应地通知条件变量。
  • 当计数线程看到写入索引已增加时,它们可以读取该行并进行计数。
  • 主线程指示文件已完全读取。 计数线程返回由 join() 传递的结果。 因此可以添加结果。

备注:可能只有一个线程将执行计数,这表明不需要其他线程。 至少有 2 个线程:读取线程和处理线程。

我认为问题是你必须在getline调用周围有一个互斥锁。两个线程同时访问文件,这可能会导致问题。

我有这段代码,可以使用条件变量适用于您的情况。希望这有帮助

'

#include<iostream>
#include<thread>
#include<string>
#include<mutex>
#include<condition_variable>
#include<unistd.h>
#include <fstream>
#define MAX_THREADS 50
using namespace std;
thread *threads = new thread[MAX_THREADS];
condition_variable cv[MAX_THREADS];
mutex m1;
int counter=0;
int count_words_in_line(string line){
        /*write your code here*/
        return 1;
}
void printString(int tid, ifstream &inFile, int tcount)
{
    unique_lock<mutex> lock(m1);
    while(1)
    {
        string line;
        inFile >> line;
        string a = "";
        if(line==a)break;
        cv[(tid+1)%tcount].notify_one();
        cv[tid].wait(lock);
        counter += count_words_in_line(line);
    }
    cv[(tid+1)%tcount].notify_one();
}
int main(int argc, char** argv)
{
    int tcount, ccount, k;
    std::ifstream inFile;
    string name;
    inFile.open("input.txt");
    string str;
    tcount = 2;
    for(int i = 0; i < tcount; i++) {
        threads[i] = thread(printString, i, ref(inFile), tcount);
    }
    for (int i = 0; i < tcount; i++)
        threads[i].join();
    cout << counter << endl;
    return 0;
}

'

如果线程仅从文件中读取,则不需要互斥锁。您可以让 txt 文件中的线程数跳过行,以便每个线程都有自己的行要读取,然后合并结果。适用于大文件>1GB 例如:


void readerThread(char *argv[], std::unordered_set<std::string>& uniqueWords, size_t threadIndex, size_t maxThreads)
{
    std::ifstream fin (argv[1]);
    if (fin.is_open())
    {
        std::string line;
        for (size_t i = 1; i < threadIndex; i++)
        {
             fin.ignore(std::numeric_limits<std::streamsize>::max(), 'n');
        }
        while (fin)
        {
           getline(fin, line);
           std::string word;
           std::stringstream ss (line);
           while (getline (ss, word, ' '))
           {
               uniqueWords.insert (word);
           }
           for (size_t i = 1; i < maxThreads; i++)
           {
                fin.ignore(std::numeric_limits<std::streamsize>::max(), 'n');
           }
        }
    }
    else
    {
        throw std::invalid_argument("file error");
    }
}

主要:

    const auto processorCount = std::thread::hardware_concurrency();
    std::vector<std::unordered_set<std::string>> vecOfSet;
    std::unordered_set<std::string> uniqueWords;
    unsigned int uniqueWordsCount = 0;
                for(size_t threads = 0; threads < processorCount; threads++)
                {
                    vecOfSet.emplace_back(std::unordered_set<std::string>());
                }
                std::vector<std::thread> threads_;
                for(size_t threads = 0; threads < processorCount; threads++)
                {
                    std::thread thread_([ &vecOfSet, threads, argv, processorCount]() { readerThread( argv, std::ref(vecOfSet.at(threads)), threads+1, processorCount);});
                    threads_.push_back(std::move(thread_));
                }
                for(size_t threads = 0; threads < processorCount; threads++)
                {
                    threads_.at(threads).join();
                }
                for(size_t threads = 0; threads < processorCount; threads++)
                {
                    uniqueWords.merge(vecOfSet.at(threads));
                }
                uniqueWordsCount = uniqueWords.size();