将 mmap 内存用于开销非常低的循环缓冲区

Using mmap memory for a circular buffer with very low overhead

本文关键字:循环 循环缓冲 缓冲区 非常低 开销 mmap 内存 用于      更新时间:2023-10-16

我有一个调试工具,为了注册其获取的数据,它使用一个名为DiskPool的数据结构(代码如下(。开始时,此数据结构mmap一定数量的数据(由磁盘上的文件支持(。客户端可以通过简单的凹凸指针机制(使用std::atomic<size_t>实现(分配内存。

由于采集的数据量很大,我决定在一段时间内有一个窗口,而不是注册和保留所有数据。为了实现这样的目的,我必须将磁盘池更改为循环缓冲区,但这不应该造成相当大的开销,因为这种开销会影响测量。

我想问你,有没有人知道?(例如,使用 STL 的原子接口(。

#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <atomic>
#include <memory>
#include <signal.h>
#include <chrono>
#include <thread>
#define handle_error(msg) 
do { perror(msg); exit(EXIT_FAILURE); } while (0)
class DiskPool {
char* addr_;        // Initialized by mmap()
size_t len_;            // Given by the user as many as memory pages as needed
std::atomic<size_t> top_;   // Offset from address_
int fd_;
public:
DiskPool(size_t l, const char* file) : len_(l), top_(0),fd_(-1)
{
struct stat st;
fd_= open(file, O_CREAT|O_RDWR, S_IREAD | S_IWRITE);
if (fd_ == -1)
handle_error("open");
if (ftruncate(fd_, len_* sysconf(_SC_PAGE_SIZE)) != 0)
handle_error("ftruncate() error");
else {
fstat(fd_, &st);
printf("the file has %ld bytesn", (long) st.st_size);
}
addr_ = static_cast<char*>( mmap(NULL, (len_* sysconf(_SC_PAGE_SIZE)),
PROT_READ | PROT_WRITE, MAP_SHARED|MAP_NORESERVE, fd_,0));
if (addr_ == MAP_FAILED)
handle_error("mmap failed.");
}
~DiskPool()
{
close(fd_);
if( munmap(addr_, len_)< 0) {
handle_error("Could not unmap file");
exit(1);}
std::cout << "Successfully unmapped the file. " << std::endl;
}
void* allocate(size_t s)
{
size_t t = std::atomic_fetch_add(&top_, s);
return addr_+t;
}
void flush() {madvise(addr_, len_, MADV_DONTNEED);}
};

例如,我创建了使用此磁盘池在创建和销毁对象 (AutomaticLifetimeCollector( 时记录数据的示例代码。

static const std::string RECORD_FILE = "Data.txt";
static const size_t DISK_POOL_NUMBER_OF_PAGES = 10000;
static std::shared_ptr<DiskPool> diskPool =
std::shared_ptr <DiskPool> (new DiskPool(DISK_POOL_NUMBER_OF_PAGES,RECORD_FILE.c_str())); 
struct TaskRecord 
{
uint64_t tid;   // Thread id
uint64_t tag;   // User-given identifier (“f1”)
uint64_t start_time;    // nanoseconds
uint64_t stop_time;
uint64_t cpu_time;
TaskRecord(int depth, size_t tag, uint64_t start_time) :
tid(pthread_self()), tag(tag),
start_time(start_time), stop_time(0), cpu_time(0) {}
};
class AutomaticLifetimeCollector 
{
TaskRecord* record_;
public:
AutomaticLifetimeCollector(size_t tag) :
record_(new(diskPool->allocate(sizeof(TaskRecord)))
TaskRecord(2, tag, (uint64_t)1000000004L))
{
}
~AutomaticLifetimeCollector() {
record_->stop_time = (uint64_t)1000000000L;
record_->cpu_time = (uint64_t)1000000002L;
}
};
inline void DelayMilSec(unsigned int pduration)
{
std::this_thread::sleep_until(std::chrono::system_clock::now() + 
std::chrono::milliseconds(pduration));
}
std::atomic<bool> LoopsRunFlag {true};
void sigIntHappened(int signal)
{
std::cout<< "Application was terminated.";
LoopsRunFlag.store(false, std::memory_order_release);
}
int main()
{
signal(SIGINT, sigIntHappened);
unsigned int i = 0;
while(LoopsRunFlag)
{
AutomaticLifetimeCollector alc(i++);
DelayMilSec(2);
}
diskPool->flush();
return(0);
}

因此,仅考虑为可变缓冲区分发可变大小的切片,我相信比较和交换循环应该有效。

这里的基本思想是读取一个值(原子的(,用它做一些计算,然后写入值,如果它自读取以来没有改变。如果它确实发生了变化(另一个线程/进程(,则必须使用新值重新进行计算。

由于您有可变大小的对象,我认为实际上简单地将其切成 n 个数组(i + 1) % n元素是行不通的,如给定(i + item_len) % capacity,它会拆分缓冲区的结束和开始之间的分配,虽然这可能是正确的和有效的,但我认为可能不是你想要的。所以这意味着一个条件,但我认为CPU应该很好地预测它。

#include <iostream>
#include <atomic>
std::atomic<size_t> next_index = 0;
const size_t len = 100; // small for demo purpose
size_t alloc(size_t required_size)
{
if (required_size > len) std::terminate(); // do something, would cause a buffer overflow
size_t i, ret_index, new_index;
i = next_index.load();
do
{
auto space = len - i;
ret_index = required_size <= space ? i : 0; // Wrap if needed
new_index = ret_index + required_size;
} while (next_index.compare_exchange_weak(i, new_index)); // succeed if value did of i not change
return ret_index;
}
int main()
{
std::cout << alloc(4) << std::endl;  // 0 - 3
std::cout << alloc(8) << std::endl;  // 4 - 11
std::cout << alloc(32) << std::endl; // 12 - 43
std::cout << alloc(32) << std::endl; // 44 - 75
std::cout << alloc(32) << std::endl; // 0 - 31 (76 - 107 would overflow)
std::cout << alloc(32) << std::endl; // 32 - 63
std::cout << alloc(32) << std::endl; // 64 - 95
std::cout << alloc(32) << std::endl; // 0 - 31 (96 - 127 would overflow)
}

插入到您的类中应该相当简单:

void* allocate(size_t s)
{
if (s > len_ * sysconf(_SC_PAGE_SIZE)) std::terminate(); // do something, would cause a buffer overflow
size_t i, ret_index, new_index;
i = top_.load();
do
{
auto space = len_ * sysconf(_SC_PAGE_SIZE) - i;
ret_index = s <= space ? i : 0; // Wrap if needed
new_index = ret_index + s;
} while (top_.compare_exchange_weak(i, new_index)); // succeed if value did of i not change
return addr_+ ret_index;
}

len_ * sysconf(_SC_PAGE_SIZE)位于几个地方,因此存储在len_本身中可能是更有用的值。