提升::ASIO 双缓冲

boost::asio double buffering

本文关键字:缓冲 ASIO 提升      更新时间:2023-10-16

我正在尝试在网络服务器发送到客户端时为其实现双缓冲。这个想法来自

boost::asio::async_write - 确保只有一个未完成的呼叫

不幸的是,尽管它看起来很棒,但当我尝试运行它时我遇到了错误。AFAIK,我没有使用任何迭代器,所以我假设async_write是它尝试完成的时间,但我不知道真正导致问题的原因是什么。

错误发生在发布第一个async_write之后。

错误是"矢量迭代器不可取消引用">

并且来自 Vector 实现中的此来源

reference operator*() const
{   // return designated object
#if _ITERATOR_DEBUG_LEVEL == 2
const auto _Mycont = static_cast<const _Myvec *>(this->_Getcont());
if (_Mycont == 0
|| _Ptr == _Tptr()
|| _Ptr < _Mycont->_Myfirst
|| _Mycont->_Mylast <= _Ptr)
{   // report error
_DEBUG_ERROR("vector iterator not dereferencable");
_SCL_SECURE_OUT_OF_RANGE;
}

调用堆栈为:

msvcp140d.dll!00007ff9f9f40806()    Unknown
ConsoleApplication2.exe!std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > >::operator*() Line 74  C++
ConsoleApplication2.exe!boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > >::operator()() Line 532  C++
ConsoleApplication2.exe!std::_Invoker_functor::_Call<boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & __ptr64>(boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & _Obj) Line 1377   C++
ConsoleApplication2.exe!std::invoke<boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & __ptr64>(boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & _Obj) Line 1445    C++
ConsoleApplication2.exe!std::_Invoke_ret<void,boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & __ptr64>(std::_Forced<void,1> __formal, boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & <_Vals_0>) Line 1462  C++
ConsoleApplication2.exe!std::_Func_impl<boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > >,std::allocator<int>,void>::_Do_call() Line 214  C++
ConsoleApplication2.exe!std::_Func_class<void>::operator()() Line 280   C++
ConsoleApplication2.exe!boost::asio::detail::buffer_cast_helper(const boost::asio::const_buffer & b) Line 276   C++
ConsoleApplication2.exe!boost::asio::buffer_cast<void const * __ptr64>(const boost::asio::const_buffer & b) Line 435    C++
ConsoleApplication2.exe!boost::asio::buffer(const boost::asio::const_buffer & b, unsigned __int64 max_size_in_bytes) Line 751   C++
ConsoleApplication2.exe!boost::asio::detail::consuming_buffers_iterator<boost::asio::const_buffer,std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<boost::asio::const_buffer> > > >::consuming_buffers_iterator<boost::asio::const_buffer,std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<boost::asio::const_buffer> > > >(bool at_end, const boost::asio::const_buffer & first, std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<boost::asio::const_buffer> > > begin_remainder, std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<boost::asio::const_buffer> > > end_remainder, unsigned __int64 max_size) Line 62    C++
ConsoleApplication2.exe!boost::asio::detail::consuming_buffers<boost::asio::const_buffer,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> > >::begin() Line 210  C++
ConsoleApplication2.exe!boost::asio::detail::buffer_sequence_adapter<boost::asio::const_buffer,boost::asio::detail::consuming_buffers<boost::asio::const_buffer,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> > > >::validate(const boost::asio::detail::consuming_buffers<boost::asio::const_buffer,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> > > & buffer_sequence) Line 145  C++
ConsoleApplication2.exe!boost::asio::detail::win_iocp_socket_send_op<boost::asio::detail::consuming_buffers<boost::asio::const_buffer,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> > >,boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> >,boost::asio::detail::transfer_all_t,void <lambda>(const boost::system::error_code &, unsigned __int64) > >::do_complete(boost::asio::detail::win_iocp_io_service * owner, boost::asio::detail::win_iocp_operation * base, const boost::system::error_code & result_ec, unsigned __int64 bytes_transferred) Line 74   C++
ConsoleApplication2.exe!boost::asio::detail::win_iocp_operation::complete(boost::asio::detail::win_iocp_io_service & owner, const boost::system::error_code & ec, unsigned __int64 bytes_transferred) Line 47   C++
ConsoleApplication2.exe!boost::asio::detail::win_iocp_io_service::do_one(bool block, boost::system::error_code & ec) Line 406   C++
ConsoleApplication2.exe!boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec) Line 164  C++
ConsoleApplication2.exe!boost::asio::io_service::run() Line 59  C++
ConsoleApplication2.exe!ConnectionManager::IoServiceThreadProc() Line 83    C++
[External Code] 

我的最小可编译示例,即刚刚尝试在计时器上发送"哔哔#"的网络服务器是:

连接.h

#pragma once
#include <boost/asio.hpp>
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
//--------------------------------------------------------------------
class ConnectionManager;
//--------------------------------------------------------------------
class Connection : public std::enable_shared_from_this<Connection>
{
public:
typedef std::shared_ptr<Connection> SharedPtr;
// Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this
static Connection::SharedPtr Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket);
Connection(const Connection &) = delete;
Connection(Connection &&) = delete;
Connection & operator = (const Connection &) = delete;
Connection & operator = (Connection &&) = delete;
~Connection();
// We have to defer the start until we are fully constructed because we share_from_this()
void Start();
void Stop();
void Send(const std::vector<char> & data);
private:
ConnectionManager *                                     m_owner;
boost::asio::ip::tcp::socket                            m_socket;
std::atomic<bool>                                       m_stopped;
boost::asio::streambuf                                  m_receiveBuffer;
mutable std::mutex                                      m_sendMutex;
std::vector<boost::asio::const_buffer>                  m_sendBuffers[2];    // Double buffer
int                                                     m_activeSendBufferIndex;
bool                                                    m_sending;
std::vector<char>                                       m_allReadData;
Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket);
void DoReceive();
void DoSend();
};
//--------------------------------------------------------------------

连接.cpp

#include "Connection.h"
#include "ConnectionManager.h"
#include "Logger.h"
#include <boost/bind.hpp>
#include <algorithm>
#include <cstdio>
//--------------------------------------------------------------------
Connection::SharedPtr Connection::Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket)
{
return Connection::SharedPtr(new Connection(connectionManager, std::move(socket)));
}
//--------------------------------------------------------------------
Connection::Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket)
:
m_owner                             (connectionManager)
, m_socket                            (std::move(socket))
, m_stopped                           (false)
, m_receiveBuffer                     ()
, m_sendMutex                         ()
, m_sendBuffers                       ()
, m_activeSendBufferIndex             (0)
, m_sending                           (false)
, m_allReadData                       ()
{
}
//--------------------------------------------------------------------
Connection::~Connection()
{
// Boost uses RAII, so we don't have anything to do. Let thier destructors take care of business
}
//--------------------------------------------------------------------
void Connection::Start()
{
DoReceive();
}
//--------------------------------------------------------------------
void Connection::Stop()
{
// The entire connection class is only kept alive, because it is a shared pointer and always has a ref count
// as a consequence of the outstanding async receive call that gets posted every time we receive.
// Once we stop posting another receive in the receive handler and once our owner release any references to
// us, we will get destroyed.
m_stopped = true;
m_owner->OnConnectionClosed(shared_from_this());
}
//--------------------------------------------------------------------
void Connection::Send(const std::vector<char> & data)
{
std::lock_guard<std::mutex> lock(m_sendMutex);
// Append to the inactive buffer
m_sendBuffers[m_activeSendBufferIndex ^ 1].push_back(boost::asio::buffer(data));
//
DoSend();
}
//--------------------------------------------------------------------
void Connection::DoSend()
{
// Check if there is an async send in progress
// An empty active buffer indicates there is no outstanding send
if (m_sendBuffers[m_activeSendBufferIndex].empty())
{
m_activeSendBufferIndex ^= 1;
boost::asio::async_write(m_socket, m_sendBuffers[m_activeSendBufferIndex],
[this](const boost::system::error_code & errorCode, size_t bytesTransferred)
{
std::lock_guard<std::mutex> lock(m_sendMutex);
m_sendBuffers[m_activeSendBufferIndex].clear();
if (errorCode)
{
printf("An error occured while attemping to send data to a connection. Error Code: %d", errorCode.value());
// An error occurred
// We do not stop or close on sends, but instead let the receive error out and then close
return;
}
// Check if there is more to send that has been queued up on the inactive buffer,
// while we were sending what was on the active buffer
if (!m_sendBuffers[m_activeSendBufferIndex ^ 1].empty())
{
DoSend();
}
});
}
}
//--------------------------------------------------------------------
void Connection::DoReceive()
{
auto self(shared_from_this());
boost::asio::async_read_until(m_socket, m_receiveBuffer, '#',
[self](const boost::system::error_code & errorCode, size_t bytesRead)
{
if (errorCode)
{
printf("An error occured while attemping to receive data from connection. Error Code: %d", errorCode.value());
// Notify our masters that we are ready to be destroyed
self->m_owner->OnConnectionClosed(self);
// An error occured
return;
}
// Grab the read data
std::istream stream(&self->m_receiveBuffer);
std::string data;
std::getline(stream, data, '#');
data += "#";
printf("Received data from client: %s", data.c_str());
// Issue the next receive
if (!self->m_stopped)
{
self->DoReceive();
}
});
}
//--------------------------------------------------------------------

ConnectionManager.h

#pragma once
#include "Connection.h"
// Boost Includes
#include <boost/asio.hpp>
// Standard Includes
#include <thread>
#include <vector>
//--------------------------------------------------------------------
class ConnectionManager
{
public:
ConnectionManager(unsigned port, size_t numThreads);
ConnectionManager(const ConnectionManager &) = delete;
ConnectionManager(ConnectionManager &&) = delete;
ConnectionManager & operator = (const ConnectionManager &) = delete;
ConnectionManager & operator = (ConnectionManager &&) = delete;
~ConnectionManager();
void Start();
void Stop();
void OnConnectionClosed(Connection::SharedPtr connection);
protected:
boost::asio::io_service            m_io_service;
boost::asio::ip::tcp::acceptor     m_acceptor;
boost::asio::ip::tcp::socket       m_listenSocket;
std::vector<std::thread>           m_threads;
mutable std::mutex                 m_connectionsMutex;
std::vector<Connection::SharedPtr> m_connections;
boost::asio::deadline_timer        m_timer;
void IoServiceThreadProc();
void DoAccept();
void DoTimer();
};
//--------------------------------------------------------------------

连接管理器.cpp

#include "ConnectionManager.h"
#include "Logger.h"
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include<cstdio>
#include <system_error>
//------------------------------------------------------------------------------
ConnectionManager::ConnectionManager(unsigned port, size_t numThreads)
:
m_io_service  ()
, m_acceptor    (m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
, m_listenSocket(m_io_service)
, m_threads     (numThreads)
, m_timer       (m_io_service)
{
}
//------------------------------------------------------------------------------
ConnectionManager::~ConnectionManager()
{
Stop();
}
//------------------------------------------------------------------------------
void ConnectionManager::Start()
{
if (m_io_service.stopped())
{
m_io_service.reset();
}
DoAccept();
for (auto & thread : m_threads)
{
if (!thread.joinable())
{
thread.swap(std::thread(&ConnectionManager::IoServiceThreadProc, this));
}
}
DoTimer();
}
//------------------------------------------------------------------------------
void ConnectionManager::Stop()
{
{
std::lock_guard<std::mutex> lock(m_connectionsMutex);
m_connections.clear();
}
// TODO - Will the stopping of the io_service be enough to kill all the connections and ultimately have them get destroyed?
//        Because remember they have outstanding ref count to thier shared_ptr in the async handlers
m_io_service.stop();
for (auto & thread : m_threads)
{
if (thread.joinable())
{
thread.join();
}
}
}
//------------------------------------------------------------------------------
void ConnectionManager::IoServiceThreadProc()
{
try
{
// Log that we are starting the io_service thread
{
printf("io_service socket thread starting.");
}
// Run the asynchronous callbacks from the socket on this thread
// Until the io_service is stopped from another thread
m_io_service.run();
}
catch (std::system_error & e)
{
printf("System error caught in io_service socket thread. Error Code: %d", e.code().value());
}
catch (std::exception & e)
{
printf("Standard exception caught in io_service socket thread. Exception: %s", e.what());
}
catch (...)
{
printf("Unhandled exception caught in io_service socket thread.");
}
{
printf("io_service socket thread exiting.");
}
}
//------------------------------------------------------------------------------
void ConnectionManager::DoAccept()
{
m_acceptor.async_accept(m_listenSocket,
[this](const boost::system::error_code errorCode)
{
if (errorCode)
{
printf(, "An error occured while attemping to accept connections. Error Code: %d", errorCode.value());
return;
}
// Create the connection from the connected socket
std::lock_guard<std::mutex> lock(m_connectionsMutex);
Connection::SharedPtr connection = Connection::Create(this, m_listenSocket);
m_connections.push_back(connection);
connection->Start();
DoAccept();
});
}
//------------------------------------------------------------------------------
void ConnectionManager::OnConnectionClosed(Connection::SharedPtr connection)
{
std::lock_guard<std::mutex> lock(m_connectionsMutex);
auto itConnection = std::find(m_connections.begin(), m_connections.end(), connection);
if (itConnection != m_connections.end())
{
m_connections.erase(itConnection);
}
}
//------------------------------------------------------------------------------
void ConnectionManager::DoTimer()
{
if (!m_io_service.stopped())
{
// Send messages every second
m_timer.expires_from_now(boost::posix_time::seconds(30));
m_timer.async_wait(
[this](const boost::system::error_code & errorCode)
{
std::lock_guard<std::mutex> lock(m_connectionsMutex);
for (auto connection : m_connections)
{
connection->Send(std::vector<char>{'b', 'e', 'e', 'p', '#'});
}
DoTimer();
});
}
}

主.cpp

#include "ConnectionManager.h"
#include <cstring>
#include <iostream>
#include <string>
int main()
{
ConnectionManager connectionManager(4000, 2);
connectionManager.Start();
std::this_thread::sleep_for(std::chrono::minutes(5));
connectionManager.Stop();
return 0;
}

我有一个缓冲区向量,其中有两个,一个是活动的,一个是非活动的。非活动将追加到,同时发布未完成的异步写入。然后,异步写入的处理程序清除应已发送的活动缓冲区。对我来说,一切看起来都还不错。我昨天一整天都在看它。

还有人知道我做错了什么吗?我真的对如何正确使用这些缓冲区一无所知。

问题可能出在这一行

connection->Send(std::vector<char>{'b', 'e', 'e', 'p', '#'});

您创建了通过 const 引用传递给Send方法的 Vector 的临时对象,但在Send方法中,您使用的是boost::asio::buffer(data)buffer创建仅包含指向数据的指针和数据大小的对象。调用方法后Send临时向量将被删除,并且当您使用async_write在方法中发送数据时DoSend缓冲区无效。