增强ASIO和线程之间的消息传递
boost ASIO and message passing between thread
我正在设计一个websocket服务器,该服务器接收消息并将其保存到嵌入式数据库。为了阅读信息,我正在使用boost asio。为了将消息保存到嵌入式数据库中,我看到了几个选项:
- 在同一线程上收到消息后,立即同步保存消息
- 将消息异步保存在单独的线程上
我确信第二个答案就是我想要的。但是,我不知道如何将消息从套接字线程传递到IO线程。我看到以下选项:
- 每个线程使用一个io服务,并使用post函数在线程之间进行通信。在这里,我不得不担心锁争用。我应该吗
- 使用Linux域套接字在线程之间传递消息。据我所知,没有锁争用。在这里,我可能可以使用BOOST_ASIO_DISABLE_THREADS宏来获得一些性能提升
此外,我相信拥有多个IO线程会有所帮助,这些线程将以循环方式接收消息,并保存到嵌入式数据库中。
哪种体系结构的性能最好除了我提到的,还有其他选择吗?
需要注意的几点:
- 消息的长度正好为8字节
- 无法使用外部数据库。数据库必须嵌入正在运行的过程
- 我正在考虑使用RocksDB作为嵌入式数据库
我认为您不想使用unix套接字,因为它总是需要系统调用并通过内核传递数据。这通常比线程间机制更适合作为进程间机制。
除非您的数据库API要求所有调用都来自同一个线程(我对此表示怀疑),否则您不必为其使用单独的boost::asio::io_service
。相反,我会在现有的io_service
实例上创建一个io_service::strand
,并使用strand::dispatch()
成员函数(而不是io_service::post()
)执行任何阻塞数据库任务。以这种方式使用strand
可以确保最多有一个线程访问数据库时被阻止,从而使io_service
实例中的所有其他线程都可以为非数据库任务提供服务。
为什么这可能比使用单独的io_service
实例更好?一个优点是,拥有一个带有一组线程的单个实例的代码和维护稍微简单一些。另一个小优点是,如果可以的话(即,如果没有任务已经在strand
中运行),使用strand::dispatch()
将在当前线程中执行,这可以避免上下文切换。
对于最终的优化,我同意使用入队操作无法进行系统调用的专用队列可能是最快的。但是,考虑到生产者有网络i/o,消费者有磁盘i/o,我不认为队列的实现会成为瓶颈。
在进行基准测试/分析后,我发现facebook愚蠢的MPMCQueue实现是最快的,至少有50%的优势。如果我使用非阻塞写入方法,那么套接字线程几乎没有开销,IO线程仍然很忙。系统调用的数量也比其他队列实现少得多。
升压中具有cond变量的SPSC队列较慢。我不知道为什么。这可能和愚蠢队列使用的自适应旋转有关。
此外,消息传递(在本例中为UDP域套接字)的速度慢了几个数量级,尤其是对于较大的消息。这可能与两次复制数据有关。
您可能只需要一个io_service
——通过提供boost::asio::io_service::run
作为线程函数,您可以创建额外的线程来处理io_service
内发生的事件。对于通过网络套接字接收来自客户端的8字节消息,这应该可以很好地扩展。
为了将消息存储在数据库中,它取决于数据库&界面如果它是多线程的,那么你也可以从接收消息的线程向数据库发送每条消息。否则,我可能会设置一个boost::lockfree::queue
,其中一个读取器线程提取项目并将其发送到数据库,io_service
线程在新消息到达时将其附加到队列中。
这是最有效的方法吗?我不知道。这绝对很简单,如果速度不够快,你可以为你提供一个基线。但我建议一开始不要设计更复杂的东西:你根本不知道自己是否需要它,除非你对你的系统有很多了解,否则实际上不可能说复杂的方法是否比简单的方法更好。
void Consumer( lockfree::queue<uint64_t> &message_queue ) {
// Connect to database...
while (!Finished) {
message_queue.consume_all( add_to_database ); // add_to_database is a Functor that takes a message
cond_var.wait_for( ... ); // Use a timed wait to avoid missing a signal. It's OK to consume_all() even if there's nothing in the queue.
}
}
void Producer( lockfree::queue<uint64_t> &message_queue ) {
while (!Finished) {
uint64_t m = receive_from_network( );
message_queue.push( m );
cond_var.notify_all( );
}
}
假设在您的环境中使用cxx11的约束不太严格,我会尝试使用std::async对嵌入式数据库进行异步调用。
- Libmosquitto publish 不会将所有消息传递到 Azure IoT Hub
- 线程消息传递或更好:在"大师班"中访问其他班级的成员
- 如何在 boost::asio 中将打包的结构作为消息传递?(无序列化)
- 使用 enable_if 在按值传递与按引用传递之间更改函数声明
- 按引用传递和按地址传递之间的差异
- 如何在自定义 LLVM 传递之间正确传递数据结构
- "Guaranteed Delivery"消息传递 - 我应该使用 MQTT 还是 ZeroMQ?
- 核心消息传递中未处理的异常.dll在程序关闭期间
- Microsoft具有本机消息传递和非持久连接的边缘扩展不起作用
- Firebase C 云消息传递背景问题
- 从客户端到浏览器的CEF中的消息传递序列化
- 我们是否可以使用 FireBase 云消息传递来发送或接收消息,或者在 Windows 桌面/控制台或 Linux 控制
- 如何将WM_KEYDOWN消息传递到 IWebBrowser2 实例
- 如何在 GNU Radio 中实现消息传递
- msgpack:C++和Java之间的消息传递
- 增强ASIO和线程之间的消息传递
- 交流时Chrome本机消息传递错误
- 多线程C++消息传递
- c++中按值传递和按引用传递之间的差异
- 跨平台最佳 MVC 模型到控制器消息传递方法(C#、Objective-C++)