C++,Qt - 尽可能快地拆分QByteArray

C++, Qt - splitting a QByteArray as fast as possible

本文关键字:拆分 QByteArray 尽可能 Qt C++      更新时间:2023-10-16

我正在尝试拆分包含 UTF-8 编码纯文本(使用空格作为分隔符)的大量QByteArray,并具有最佳性能。我发现如果我先将数组转换为QString,我可以获得更好的结果。我尝试使用正则表达式使用 QString.split 函数,但性能非常糟糕。事实证明,这段代码要快得多:

QMutex mutex;
QSet<QString> split(QByteArray body)
{
    QSet<QString>  slova;
    QString s_body = QTextCodec::codecForMib(106)->toUnicode(body);
    QString current;
    for(int i = 0; i< body.size(); i++){
        if(s_body[i] == 'r' || s_body[i] == 'n' || s_body[i] == 't' || s_body[i] == ' '){
            mutex.lock();
            slova.insert(current);
            mutex.unlock();
            current.clear();
            current.reserve(40);
        } else {
            current.push_back(s_body[i]);
        }
    }
    return slova;
}

"Slova"目前是一个QSet<QString>,但我可以使用std::set或任何其他格式。此代码应该查找数组中有多少个唯一单词,并具有最佳性能。

不幸的是,这段代码运行得还不够快。我希望从中挤出绝对最大值。

使用callgrind,我发现最贪吃的内部函数是:

QString::reallocData (18% absolute cost)
QString::append (10% absolute cost)
QString::operator= (8 % absolute cost)
QTextCodec::toUnicode (8% absolute cost)

显然,这与源自push_back函数的内存分配有关。解决此问题的最佳方法是什么? 不一定是Qt解决方案——纯C或C++也是可以接受的。

尽量减少您需要执行的复制量。 将输入缓冲区保留为 UTF-8,不要在集合中存储std::stringQString;相反,创建一个小类来引用现有的 UTF-8 数据:

#include <QString>
class stringref {
    const char *start;
    size_t length;
public:
    stringref(const char *start, const char *end);
    operator QString() const;
    bool operator<(const stringref& other) const;
};

这可以封装 UTF-8 输入的子字符串。 您需要确保它不会超过输入字符串;你可以通过巧妙地使用std::shared_ptr来做到这一点,但如果代码是合理的自包含的,那么它应该足够容易处理,可以推理生命周期。

我们可以将它从一对指针构造成我们的 UTF-8 数据,并在我们想要实际使用它时将其转换为 QString

stringref::stringref(const char *start, const char *end)
    : start(start), length(end-start)
{}
stringref::operator QString() const
{
    return QString::fromUtf8(start, length);
}

您需要定义operator<以便可以在std::set中使用它。

#include <cstring>
bool stringref::operator<(const stringref& other) const
{
    return length == other.length
        ? std::strncmp(start, other.start, length) < 0
        : length < other.length;
}

请注意,我们在取消引用指针之前按长度排序,以减少缓存影响。


现在我们可以编写 split 方法:

#include <set>
#include <QByteArray>
std::set<stringref> split(const QByteArray& a)
{
    std::set<stringref> words;
    // start and end
    const auto s = a.data(), e = s + a.length();
    // current word
    auto w = s;
    for (auto p = s;  p <= e;  ++p) {
        switch (*p) {
        default: break;
        case ' ': case 'r': case 'n': case 't': case '':
            if (w != p)
                words.insert({w, p});
            w = p+1;
        }
    }
    return words;
}

该算法几乎是您的,添加了w!=p测试,因此不会计算空格的运行。


让我们测试一下,并计时重要的位:

#include <QDebug>
#include <chrono>
int main()
{
    QByteArray body{"foo bar bazn  foo againnbar again "};
    // make it a million times longer
    for (int i = 0;  i < 20;  ++i)
        body.append(body);
    using namespace std::chrono;
    const auto start = high_resolution_clock::now();
    auto words = split(body);
    const auto end = high_resolution_clock::now();
    qDebug() << "Split"
             << body.length()
             << "bytes in"
             << duration_cast<duration<double>>(end - start).count()
             << "seconds";
    for (auto&& word: words)
        qDebug() << word;
}

我得到:

在 1.99142 秒
内拆分 35651584 个字节 "酒吧"
"巴兹"
"噗"
"再说一遍"

使用 -O3 编译将时间减少到 0.6188 秒,所以不要忘记向编译器寻求帮助!

如果这仍然不够快,那么可能是时候开始考虑并行化任务了。 您需要将字符串拆分为大致相等的长度,但前进到下一个空格,以便没有工作跨越两个线程的工作。 每个线程都应创建自己的结果集,然后缩减步骤是合并结果集。 我不会为此提供完整的解决方案,因为这本身就是另一个问题。

正如怀疑的那样,您最大的成本是push_back一次附加一个字符时导致频繁的重新分配。为什么不继续搜索,然后使用QString::mid()一次附加所有数据:

slova.insert(s_body.mid(beginPos, i - beginPos - 1));

其中beginPos保存当前子字符串开头的索引。不是在插入slova之前将每个字符附加到current,而是一次完成所有复制。复制子字符串后,提前搜索下一个有效(不是分隔符)字符,并将beginPos设置为等于该索引。

在(粗略)代码中:

QString s_body = ...
//beginPos tells us the index of the current substring we are working 
//with. -1 means the previous character was a separator
int beginPos = -1;
for (...) {
    //basically your if statement provided in the question as a function
    if (isSeparator(s_body[i])) {
         //ignore double white spaces, etc.
         if (beginPos != -1) {
             mutex.lock();
             slova.insert(s_body.mid(beginPos, i - beginPos - 1));
             mutex.unlock();
         }
    } else if (beginPos == -1)
        //if beginPos is not valid and we are not on a separator, we 
        //are at the start of a new substring.
         beginPos = i;
}

此方法将大大减少堆分配的开销,并消除QString::push_back()调用。

最后一点:QByteArray还提供了mid()功能。您可以跳过转换到完全QString,直接使用字节数组。

如果我是你,我要做的第一件事就是修改你的代码,这样它就不会锁定和解锁 QMutex 插入 QSet 的任何单词——这纯粹是开销。 要么在循环开始时只锁定一次 QMutex,然后在循环终止后再次解锁;或者更好的是,插入到无法从任何其他线程访问的 QSet 中,这样您根本不需要锁定任何 QMutexe。

有了这个,第二件事是消除尽可能多的堆分配。 理想情况下,您将执行整个解析,而无需分配或释放任何动态内存;我下面的实现就是这样做的(嗯,几乎 - unordered_set可能会做一些内部分配,但它可能不会)。 在我的电脑(2.7GHz Mac Mini)上,我测量了每秒约1100万字的处理速度,使用Moby Dick的Gutenberg ASCII文本作为我的测试输入。

请注意,由于 UTF-8

使用的向后兼容编码,该程序与 UTF-8 或 ASCII 输入同样有效。

#include <ctype.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <unordered_set>
// Loads in a text file from disk into an in-memory array
// Expected contents of the file are ASCII or UTF8 (doesn't matter which).
// Note that this function appends a space to the end of the returned array
// That way the parsing function doesn't have to include a special case
// since it is guaranteed that every word in the array ends with whitespace
static char * LoadFile(const char * fileName, unsigned long * retArraySizeBytes)
{
   char * ret = NULL;
   *retArraySizeBytes = 0;
   FILE * fpIn = fopen(fileName, "r");
   if (fpIn)
   {
      if (fseek(fpIn, 0L, SEEK_END) == 0)
      {
         const unsigned long fileSizeBytes  = ftell(fpIn);
         const unsigned long arraySizeBytes = *retArraySizeBytes = fileSizeBytes+1;  // +1 because I'm going to append a space to the end
         rewind(fpIn);
         ret = new char[arraySizeBytes];
         if (fread(ret, 1, fileSizeBytes, fpIn) == fileSizeBytes)
         {
            ret[fileSizeBytes] = ' ';  // appending a space allows me to simplify the parsing step
         }
         else
         {
            perror("fread");
            delete [] ret;
            ret = NULL;
         }
      }
      else perror("fseek");
      fclose(fpIn);
   }
   return ret;
}
// Gotta provide our own equality-testing function otherwise unordered_set will just compare pointer values
struct CharPointersEqualityFunction : public std::binary_function<char *, char *,bool>
{  
    bool operator() (char * s1, char * s2) const {return strcmp(s1, s2) == 0;}
};
// Gotta provide our own hashing function otherwise unordered_set will just hash the pointer values
struct CharPointerHashFunction
{
   int operator() (char * str) const
   {
      // djb2 by Dan Bernstein -- fast enough and simple enough
      unsigned long hash = 5381;
      int c; while((c = *str++) != 0) hash = ((hash << 5) + hash) + c;
      return (int) hash;
   }
};
typedef std::unordered_set<char *, CharPointerHashFunction, CharPointersEqualityFunction > CharPointerUnorderedSet;
int main(int argc, char ** argv)
{
   if (argc < 2)
   {
      printf("Usage:  ./split_words filenamen");
      return 10;
   }    
   unsigned long arraySizeBytes;
   char * buf = LoadFile(argv[1], &arraySizeBytes);
   if (buf == NULL)
   {
      printf("Unable to load input file [%s]n", argv[1]);
      return 10;
   }
   CharPointerUnorderedSet set;
   set.reserve(100000);  // trying to size (set) big enough that no reallocations will be necessary during the parse
   struct timeval startTime;
   gettimeofday(&startTime, NULL);
   // The actual parsing of the text is done here
   int wordCount = 0;
   char * wordStart = buf;
   char * wordEnd   = buf;
   char * bufEnd    = &buf[arraySizeBytes];
   while(wordEnd < bufEnd)
   {
      if (isspace(*wordEnd))
      {
         if (wordEnd > wordStart)
         {
            *wordEnd = '';
            set.insert(wordStart);
            wordCount++;
         }
         wordStart = wordEnd+1;   
      }
      wordEnd++;
   }
   struct timeval endTime;
   gettimeofday(&endTime, NULL);
   unsigned long long startTimeMicros = (((unsigned long long)startTime.tv_sec)*1000000) + startTime.tv_usec;
   unsigned long long endTimeMicros   = (((unsigned long long)  endTime.tv_sec)*1000000) + endTime.tv_usec;
   double secondsElapsed = ((double)(endTimeMicros-startTimeMicros))/1000000.0;
   printf("Parsed %i words (%zu unique words) in %f seconds, aka %.0f words/secondn", wordCount, set.size(), secondsElapsed, wordCount/secondsElapsed);
   //for (const auto& elem: set) printf("word=[%s]n", elem);
   delete [] buf;
   return 0;
}