C++-如何为同时/异步处理对文件进行分块

C++ - How to chunk a file for simultaneous/async processing?

本文关键字:文件 处理 异步 C++-      更新时间:2023-10-16

如何按行数读取和分割/块文件?

我想将一个文件划分到不同的缓冲区中,同时确保一行不会在两个或多个缓冲区之间分割。我计划将这些缓冲区传递到它们自己的pthread中,这样它们就可以执行某种类型的同时/异步处理。

我已经阅读了下面的答案,使用c在linux上进行块读写,但我认为它并不能完全回答确保一行不会被分成两个或更多缓冲区的问题。

文件是如何编码的?如果每个字节代表一个字符,我会做以下操作:

  1. 内存使用mmap()映射文件
  2. 通过根据适当的块大小计算,告诉作业它们的大致开始和结束
  3. 通过找到下一个'n',让每个作业找到其实际的开始和结束
  4. 同时处理各个块
  5. 请注意,第一个块需要特殊处理,因为它的开始不是近似的,而是精确的

我会选择以字节为单位的块大小。然后,我会在文件中寻找合适的位置,一次读取一些少量的字节,直到我得到一行换行符。

第一个区块的最后一个字符是换行符。第二个区块的第一个字符是换行符之后的字符。

始终查找pagesize()边界,并一次读取pagesize()字节来搜索换行符。这将确保您只从磁盘中提取找到边界所需的最小值。您可以尝试一次读取128个字节或其他内容。但是,您可能会进行更多的系统调用。

我写了一个例子程序,这样做的字母频率计数。当然,这在很大程度上是毫无意义的,因为它几乎肯定是IO绑定的。换行符在哪里也无关紧要,因为它不是面向行的。但是,这只是一个例子。此外,它在很大程度上依赖于您是否拥有一个相当完整的C++11实现。

lisp.paste.org上的
  • thread_file_split.cpp

它们的关键功能是:

// Find the offset of the next newline given a particular desired offset.
off_t next_linestart(int fd, off_t start)
{
using ::std::size_t;
using ::ssize_t;
using ::pread;
const size_t bufsize = 4096;
char buf[bufsize];
for (bool found = false; !found;) {
const ssize_t result = pread(fd, buf, bufsize, start);
if (result < 0) {
throw ::std::system_error(errno, ::std::system_category(),
"Read failure trying to find newline.");
} else if (result == 0) {
// End of file
found = true;
} else {
const char * const nl_loc = ::std::find(buf, buf + result, 'n');
if (nl_loc != (buf + result)) {
start += ((nl_loc - buf) + 1);
found = true;
} else {
start += result;
}
}
}
return start;
}

还要注意,我使用pread。当您有多个线程从文件的不同部分读取时,这是绝对必要的。

文件描述符是线程之间的共享资源。当一个线程使用普通函数从文件中读取时,它会更改有关共享资源(文件指针)的详细信息。文件指针是文件中发生下一次读取的位置。

在每次阅读之前简单地使用lseek并没有帮助,因为它在lseekread之间引入了竞争条件。

pread函数允许您从文件中的特定位置读取一堆字节。它也根本不会改变文件指针。除了不改变文件指针之外,它在其他方面就像在同一个调用中组合lseekread

为缓冲区定义一个类。为每一个提供一个大的缓冲区空间,该空间是页面大小的倍数,并提供一个开始/结束索引,一个从传入流中读取缓冲区的方法,以及一个将另一个*buffer实例作为参数的"lineParse"方法。

制作一些*缓冲区,并将它们存储在生产者-消费者池队列中。打开文件,从池中获取一个缓冲区,并从头到尾读取缓冲区空间,(返回错误/EOF的布尔值)。从池中获取另一个*缓冲区,并将其传递到前一个缓冲区的lineparse()中。在那里,从数据的末尾向后搜索,寻找换行符。找到后,重新加载结束索引,并将最后一行的片段(如果有的话——你可能偶尔会很幸运:)记忆到新的已传递*缓冲区中,并设置其开始索引。第一个缓冲区现在有整行,可以排队到将处理这些行的线程。第二个缓冲区具有从第一个缓冲区复制的行片段,并且更多的数据可以从磁盘读取到其起始索引处的缓冲区空间中。

行处理线程可以将"used"*缓冲区回收到池中。

继续进行,直到EOF,(或错误:)。

如果可以,请向缓冲区类中添加一个处理缓冲区的方法。

使用大的缓冲区类并从最后进行解析将比连续读取小比特、从一开始就寻找换行符更有效。线程间通信很慢,可以传递的缓冲区越大越好。

使用缓冲池可以消除连续的新/删除,并提供流控制-如果磁盘读取线程的处理速度快,则池将清空,磁盘读取线程将阻塞池,直到一些使用过的缓冲区被回收。这样可以防止内存失控。

请注意,如果使用多个处理线程,缓冲区可能会被"无序"处理——这可能很重要,也可能无关紧要。

在这种情况下,您只能通过确保与磁盘读取延迟并行处理行的优势大于线程间通信的开销来获得好处——在线程之间通信小缓冲区很可能会适得其反。

最大的加速是使用总体速度快但延迟大的网络磁盘。