将 mmap 内存用于开销非常低的循环缓冲区
Using mmap memory for a circular buffer with very low overhead
我有一个调试工具,为了注册其获取的数据,它使用一个名为DiskPool
的数据结构(代码如下(。开始时,此数据结构mmap
一定数量的数据(由磁盘上的文件支持(。客户端可以通过简单的凹凸指针机制(使用std::atomic<size_t>
实现(分配内存。
由于采集的数据量很大,我决定在一段时间内有一个窗口,而不是注册和保留所有数据。为了实现这样的目的,我必须将磁盘池更改为循环缓冲区,但这不应该造成相当大的开销,因为这种开销会影响测量。
我想问你,有没有人知道?(例如,使用 STL 的原子接口(。
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <atomic>
#include <memory>
#include <signal.h>
#include <chrono>
#include <thread>
#define handle_error(msg)
do { perror(msg); exit(EXIT_FAILURE); } while (0)
class DiskPool {
char* addr_; // Initialized by mmap()
size_t len_; // Given by the user as many as memory pages as needed
std::atomic<size_t> top_; // Offset from address_
int fd_;
public:
DiskPool(size_t l, const char* file) : len_(l), top_(0),fd_(-1)
{
struct stat st;
fd_= open(file, O_CREAT|O_RDWR, S_IREAD | S_IWRITE);
if (fd_ == -1)
handle_error("open");
if (ftruncate(fd_, len_* sysconf(_SC_PAGE_SIZE)) != 0)
handle_error("ftruncate() error");
else {
fstat(fd_, &st);
printf("the file has %ld bytesn", (long) st.st_size);
}
addr_ = static_cast<char*>( mmap(NULL, (len_* sysconf(_SC_PAGE_SIZE)),
PROT_READ | PROT_WRITE, MAP_SHARED|MAP_NORESERVE, fd_,0));
if (addr_ == MAP_FAILED)
handle_error("mmap failed.");
}
~DiskPool()
{
close(fd_);
if( munmap(addr_, len_)< 0) {
handle_error("Could not unmap file");
exit(1);}
std::cout << "Successfully unmapped the file. " << std::endl;
}
void* allocate(size_t s)
{
size_t t = std::atomic_fetch_add(&top_, s);
return addr_+t;
}
void flush() {madvise(addr_, len_, MADV_DONTNEED);}
};
例如,我创建了使用此磁盘池在创建和销毁对象 (AutomaticLifetimeCollector
( 时记录数据的示例代码。
static const std::string RECORD_FILE = "Data.txt";
static const size_t DISK_POOL_NUMBER_OF_PAGES = 10000;
static std::shared_ptr<DiskPool> diskPool =
std::shared_ptr <DiskPool> (new DiskPool(DISK_POOL_NUMBER_OF_PAGES,RECORD_FILE.c_str()));
struct TaskRecord
{
uint64_t tid; // Thread id
uint64_t tag; // User-given identifier (“f1”)
uint64_t start_time; // nanoseconds
uint64_t stop_time;
uint64_t cpu_time;
TaskRecord(int depth, size_t tag, uint64_t start_time) :
tid(pthread_self()), tag(tag),
start_time(start_time), stop_time(0), cpu_time(0) {}
};
class AutomaticLifetimeCollector
{
TaskRecord* record_;
public:
AutomaticLifetimeCollector(size_t tag) :
record_(new(diskPool->allocate(sizeof(TaskRecord)))
TaskRecord(2, tag, (uint64_t)1000000004L))
{
}
~AutomaticLifetimeCollector() {
record_->stop_time = (uint64_t)1000000000L;
record_->cpu_time = (uint64_t)1000000002L;
}
};
inline void DelayMilSec(unsigned int pduration)
{
std::this_thread::sleep_until(std::chrono::system_clock::now() +
std::chrono::milliseconds(pduration));
}
std::atomic<bool> LoopsRunFlag {true};
void sigIntHappened(int signal)
{
std::cout<< "Application was terminated.";
LoopsRunFlag.store(false, std::memory_order_release);
}
int main()
{
signal(SIGINT, sigIntHappened);
unsigned int i = 0;
while(LoopsRunFlag)
{
AutomaticLifetimeCollector alc(i++);
DelayMilSec(2);
}
diskPool->flush();
return(0);
}
因此,仅考虑为可变缓冲区分发可变大小的切片,我相信比较和交换循环应该有效。
这里的基本思想是读取一个值(原子的(,用它做一些计算,然后写入值,如果它自读取以来没有改变。如果它确实发生了变化(另一个线程/进程(,则必须使用新值重新进行计算。
由于您有可变大小的对象,我认为实际上简单地将其切成 n 个数组(i + 1) % n
元素是行不通的,如给定(i + item_len) % capacity
,它会拆分缓冲区的结束和开始之间的分配,虽然这可能是正确的和有效的,但我认为可能不是你想要的。所以这意味着一个条件,但我认为CPU应该很好地预测它。
#include <iostream>
#include <atomic>
std::atomic<size_t> next_index = 0;
const size_t len = 100; // small for demo purpose
size_t alloc(size_t required_size)
{
if (required_size > len) std::terminate(); // do something, would cause a buffer overflow
size_t i, ret_index, new_index;
i = next_index.load();
do
{
auto space = len - i;
ret_index = required_size <= space ? i : 0; // Wrap if needed
new_index = ret_index + required_size;
} while (next_index.compare_exchange_weak(i, new_index)); // succeed if value did of i not change
return ret_index;
}
int main()
{
std::cout << alloc(4) << std::endl; // 0 - 3
std::cout << alloc(8) << std::endl; // 4 - 11
std::cout << alloc(32) << std::endl; // 12 - 43
std::cout << alloc(32) << std::endl; // 44 - 75
std::cout << alloc(32) << std::endl; // 0 - 31 (76 - 107 would overflow)
std::cout << alloc(32) << std::endl; // 32 - 63
std::cout << alloc(32) << std::endl; // 64 - 95
std::cout << alloc(32) << std::endl; // 0 - 31 (96 - 127 would overflow)
}
插入到您的类中应该相当简单:
void* allocate(size_t s)
{
if (s > len_ * sysconf(_SC_PAGE_SIZE)) std::terminate(); // do something, would cause a buffer overflow
size_t i, ret_index, new_index;
i = top_.load();
do
{
auto space = len_ * sysconf(_SC_PAGE_SIZE) - i;
ret_index = s <= space ? i : 0; // Wrap if needed
new_index = ret_index + s;
} while (top_.compare_exchange_weak(i, new_index)); // succeed if value did of i not change
return addr_+ ret_index;
}
len_ * sysconf(_SC_PAGE_SIZE)
位于几个地方,因此存储在len_
本身中可能是更有用的值。
相关文章:
- 如何循环打印顶点结构
- 如何在C++中从两个单独的for循环中添加两个数组
- C++我的数学有什么问题,为什么我的代码不能正确循环
- 正在尝试了解输入验证循环
- std::map<struct,struct>::find 找不到匹配项,但是如果我循环通过 begin() 到 end(),我在那里看到匹配项
- 循环后如何继续阅读
- 有没有办法重复循环循环?
- Qt有循环缓冲吗?
- 虽然第三次循环循环,尽管在我眼中没有满足它的条件,因此打印了一个空字符串
- 避免以不同的迭代剂的循环循环避免几乎相同的代码重复
- 循环循环保存到数组,然后访问;
- 未定义的行为怪癖:在缓冲区外读取导致循环永远不会终止
- 对于循环循环仅 3 次
- 处理媒体循环循环循环通过向量中的对象的各个方面
- 将C转换为循环循环循环
- 关于如何正确循环循环并找到最小的价值,需要基本C 的帮助
- 我如何循环循环switch语句检查重新检查char?(C )
- 在嵌套循环/循环不变量中检查一次
- 只有在充满for循环的情况下才能进行开放缓冲播放
- 调用linearize后奇怪的boost循环缓冲行为