通过 mmap-ed 共享内存传递可变长度 C 字符串

Passing Variable Length C String Through Mmap-ed Shared Memory

本文关键字:字符串 mmap-ed 共享 内存 通过      更新时间:2023-10-16

假设我有一个进程A和一个进程B,进程A想通过shm_open((+ mmap((共享内存传递一个C字符串到进程B。

什么是最节省延迟的方法?

这篇文章的回答表明,在C++11之后,std::atomic是通过共享内存共享数据的正确方法。

但是,我看不出如何编写一些东西来编写带有以下内容的 C 字符串:

struct Buffer {
std::atomic<uint32_t> length;
std::atomic<char*> str;
} __attribute__((packed));

鉴于我以这种方式创建了一个共享内存:

class SHM {
char* _ptr;
public:
SHM() {
const auto handle = shm_open("myTest", O_RDWR|O_CREAT, 0666);
const auto size =  4 * 1024 * 1024;
if (-1 == ftruncate(handle, size)) {
throw;
}
_ptr = (char*)mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);
if(_ptr == MAP_FAILED){
throw;
}
int rc = fchmod(handle, 0666);
if (rc == -1) {
throw;
}
}
// assume to caller will do buffer.size.store(someLength, std::memory_order_release); after filling up Buffer::str
Buffer& getBuffer() noexcept {
return *reinrepret_cast<Buffer*>(_ptr);
}
Buffer& read() {
auto& buffer = *reinrepret_cast<Buffer*>(_ptr);
while (buffer.size.load(std::memory_order_acquire) > 0) {
buffer.str.load(std::memory_order_relaxed);
return buffer;
}
}
};

调用方如何SHM::getBuffer()正确地逐个字符写入 Buffer::str 字符,以便进程 B 可以调用SHM::read()进行检索?

buffer.str.load(std::memory_order_relaxed( 真的是原子和正确地加载吗? 我对此表示怀疑,因为它甚至不知道长度。

这适用于Linux,X86-64,GCC 7。

提前谢谢。

这是单生产者-单消费者案例的工作草图(生产者/消费者线程是否来自同一进程并不重要(,无需等待:

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <unistd.h>
#include <fcntl.h>
#include <utility>
#include <cstring>
#include <string>
#include <atomic>
class SingleProducerSingleConsumerIndexes {
std::atomic<uint64_t> produced_ = {};
std::atomic<uint64_t> consumed_ = {};
public: // Producer interface.
uint64_t produced() {
auto consumed = consumed_.load(std::memory_order_acquire); // Syncronizes with store 2.
auto produced = produced_.load(std::memory_order_relaxed);
if(produced != consumed || !produced)
return produced;
// Entire buffer was consumed. Rewind.
produced_.store(0, std::memory_order_release); // Store 1.
consumed_.store(0, std::memory_order_relaxed); // Store 3.
return 0;
}
void produce(uint64_t end) {
produced_.store(end, std::memory_order_release); // Store 1.
}
public: // Consumer interface.
std::pair<uint64_t, uint64_t> available() const {
auto produced = produced_.load(std::memory_order_acquire); // Syncronizes with store 1.
auto consumed = consumed_.load(std::memory_order_relaxed);
// min handles the case of store 3 not visible yet.
return {std::min(produced, consumed), produced};
}
void consume(uint64_t end) {
consumed_.store(end, std::memory_order_release); // Store 2.
}
};
class SharedMemoryStrings {
void* p_;
static constexpr int size = 4 * 1024 * 1024;
static constexpr int buffer_size = size - sizeof(SingleProducerSingleConsumerIndexes);
public:
SharedMemoryStrings() {
auto handle = ::shm_open("/another-test", O_RDWR|O_CREAT, 0666);
if(-1 == ::ftruncate(handle, size))
throw;
p_ = ::mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);
::close(handle);
if(p_ == MAP_FAILED)
throw;
}
~SharedMemoryStrings() {
::munmap(p_, size);
}
void produce(std::string const& s) {
auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
auto produced = indexes->produced();
uint64_t new_end = produced + sizeof(uint64_t) + s.size();
if(new_end > buffer_size)
throw; // Out of buffer space.
auto* buffer = reinterpret_cast<char*>(indexes + 1) + produced;
uint64_t size = s.size();
memcpy(buffer, &size, sizeof size);
buffer += sizeof size;
memcpy(buffer, s.data(), s.size());
indexes->produce(new_end);
}
bool try_consume(std::string& s) {
auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
auto available = indexes->available();
auto consumed = available.first;
auto produced = available.second;
if(consumed == produced)
return false; // No data available.
auto* buffer = reinterpret_cast<char const*>(indexes + 1) + consumed;
uint64_t size;
memcpy(&size, buffer, sizeof size);
buffer += sizeof size;
// Reuse the string to minimize memory allocations.
s.assign(buffer, size);
indexes->consume(consumed + sizeof(uint64_t) + size);
return true;
}
};
int main(int ac, char** av) {
if(ac > 1) {
// Producer.
SharedMemoryStrings a;
for(int i = 1; i < ac; ++i)
a.produce(av[i]);
}
else {
// Consumer.
SharedMemoryStrings a;
for(std::string s;;) { // Busy-wait loop.
if(a.try_consume(s)) // Reuse the string to minimize memory allocations.
printf("%sn", s.c_str());
// else // Potential optimization.
//     _mm_pause();
}
}
}

笔记:

  • g++ -o test -W{all,extra,error} -std=gnu++11 -O3 -DNDEBUG -march=native -pthread -lrt test.cc一样编译代码。假设此源称为test.cc

  • 启动没有参数的使用者,./test.有论据的制片人,比如./test hello world.启动顺序无关紧要。

  • 这是一个单一生产者-单一消费者的解决方案。它是无等待的(生产者和消费者的调用在固定数量的指令中完成,没有循环(,这比无锁定(不能保证在固定数量的指令中完成(要好。不能比这更快。

  • 在x86-64上,这些获取和释放原子负载并存储编译为纯mov指令,因为当前的x86-64内存模型有点太强了。但是,使用std::atomic和特定的内存顺序可确保编译器不会对指令进行重新排序。它还确保代码在内存模型较弱的架构上编译和工作,并在必要时插入适当的屏障,这是volatile不可能做到的。例如,像PowerPC。使用volatile与使用std::memory_order_relaxed相同。请参阅程序集比较。

  • produced_.store(end, std::memory_order_release);确保生产者线程创建的所有先前存储(memcpy共享内存(在produced_.load(std::memory_order_acquire);看到此存储的效果后立即对使用者线程可见。请参阅 http://preshing.com/20130823/the-synchronizes-with-relation/以彻底处理该主题。std::memory_order也说得最好:

    memory_order_acquire具有此内存顺序的加载操作在受影响的内存位置上执行获取操作:在此加载之前,当前线程中的读取或写入都不能重新排序。释放相同原子变量的其他线程中的所有写入在当前线程中可见。

    memory_order_release具有此内存顺序的存储操作执行释放操作:在此存储之后,当前线程中的读取或写入都不能重新排序。当前线程中的所有写入在获取相同原子变量的其他线程中可见,并且将依赖项带入原子变量的写入在使用相同原子的其他线程中可见。

  • 生产者检测使用者何时使用了所有可用数据。在这种情况下,生产者将缓冲区倒回开头。这样做是为了避免处理环形缓冲区的缓冲区包装。如果使用者不能足够快地处理消息,则无论如何,缓冲区最终都会变满。

  • 它从不调用构造函数SingleProducerSingleConsumerIndexes。它依赖于新文件为零初始化的事实,这就是构造函数将要执行的操作。在更复杂的方案中,如果文件刚刚创建,则需要调用共享数据的构造函数。这可以通过首先创建一个具有唯一名称的临时文件(如果该文件尚不存在(,将文件映射到内存并调用构造函数来完成。然后将该临时文件重命名为最终名称(rename是原子的(。如果重命名失败,因为文件已存在,请删除临时文件并重新启动。

  • 使用者忙于等待尽可能低的延迟。如果您希望使用者在等待时阻塞,则可以添加进程共享互斥锁和条件变量来实现这一点。但是,唤醒内核中等待条件变量(Linux 中的futex(的线程需要几微秒。这将需要调用构造函数SingleProducerSingleConsumerIndexes执行所有必需的初始化(例如,初始化健壮的自适应进程共享互斥锁和进程共享条件变量(。