多个背靠背的boost::asio-async_send_to调用导致缓冲区溢出
multiple back to back boost::asio async_send_to calls cause buffer overun
我在使用boost asio发送多个背靠背的独立UDP缓冲区时遇到问题。我有一个1秒的asio定时器,它触发一个回调,通过UDP传输两个独立的UDP数据报结构。这些消息结构中的每一个都是通过std::unique_ptr分配的,所以在调用异步CADaemon::handle_send回调时,它们不应该超出范围。
void
CADaemon::heartBeatTimer(
const milliseconds& rHeartBeatMs)
{
mpStatusTimer->expires_from_now(rHeartBeatMs);
mpStatusTimer->async_wait(boost::bind(
&CADaemon::heartBeatTimer,
this, rHeartBeatMs));
if (mpALBFSocket && mpALBFEndpoint) {
mpALBFSocket->async_send_to(
buffer(mpStatusMessage.get(),
sizeof(MemberSystemStatusMessage)),
*mpALBFEndpoint,
boost::bind(&CADaemon::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
// must insert delay to prevent buffer overwrites
std::this_thread::sleep_for(std::chrono::milliseconds(10);
// heartbeat messages are also sent to this socket/endpoint
mpALBFSocket->async_send_to(
buffer(mpHeartbeatMessage.get(),
sizeof(CAServiceHeartbeatMessage)),
*mpALBFEndpoint,
boost::bind(&CADaemon::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
如果我在发送第一条消息和第二条消息之间设置了一个小延迟,接收应用程序就会工作,但是,如果我按原样发送它们,那么在第一条消息到达接收应用程序时,第二个缓冲区似乎会覆盖第一条消息。
我做错了什么?
我还尝试用下面的代码发送多个缓冲区,但由于它将两个数据报合并为一个长数据报,所以性能更差。
void
CADaemon::heartBeatTimer(
const milliseconds& rHeartBeatMs)
{
mpStatusTimer->expires_from_now(rHeartBeatMs);
mpStatusTimer->async_wait(boost::bind(
&CADaemon::heartBeatTimer,
this, rHeartBeatMs));
if (mpALBFSocket && mpALBFEndpoint) {
std::vector<boost::asio::const_buffer> transmitBuffers;
transmitBuffers.push_back(buffer(
mpStatusMessage.get(),
sizeof(MemberSystemStatusMessage)));
//transmitBuffers.push_back(buffer(
// mpHeartbeatMessage.get(),
// sizeof(CAServiceHeartbeatMessage)));
mpALBFSocket->async_send_to(
transmitBuffers, *mpALBFEndpoint,
boost::bind(&CADaemon::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
以下是相关头文件中ASIO所涉及的类的成员。
// this message is transmitted @1HZ
std::unique_ptr<MemberSystemStatusMessage> mpStatusMessage;
// this message is transmitted @1HZ
std::unique_ptr<CAServiceHeartbeatMessage> mpHeartbeatMessage;
// this message is received @1HZ
std::unique_ptr<WOperationalSupportMessage> mpOpSupportMessage;
// this message is received @1HZ when valid
std::unique_ptr<MaintenanceOTPMessage> mpOTPMessage;
std::shared_ptr<boost::asio::io_service> mpIOService;
std::unique_ptr<boost::asio::ip::udp::socket> mpALBFSocket;
std::unique_ptr<boost::asio::ip::udp::endpoint> mpALBFEndpoint;
std::unique_ptr<boost::asio::ip::udp::socket> mpServerSocket;
std::unique_ptr<boost::asio::ip::udp::endpoint> mpServerEndpoint;
std::unique_ptr<boost::asio::steady_timer> mpStatusTimer;
std::unique_ptr<uint8_t[]> mpReceiveBuffer;
这是回调处理程序
void
CADaemon::handle_send(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
static auto& gEvtLog = gpLogger->getLoggerRef(
Logger::LogDest::EventLog);
if (!error || (error == boost::asio::error::message_size)) {
// Critical Section - exclusive write
boost::unique_lock<boost::shared_mutex> uniqueLock(gRWMutexGuard);
LOG_EVT_INFO(gEvtLog) << *mpStatusMessage;
LOG_EVT_INFO(gEvtLog) << *mpHeartbeatMessage;
LOG_EVT_INFO(gEvtLog) << "Sent " << bytes_transferred << " bytes";
mpStatusMessage->incrementSequenceCounter();
} else {
LOG_EVT_ERROR(gEvtLog) << "handle_send: asio error code["
<< error.value() << "]";
}
}
编辑:添加了带有缓冲区损坏的接收JAVA应用程序代码
下面的代码显示了接收java应用程序中的代码,注意到接收到的数据报的大小从未损坏,只是内容,大小似乎总是更长的数据报。希望这对追踪问题有用。
@Override
protected Task<Void> createTask() {
return new Task<Void>() {
@Override
protected Void call() throws Exception {
updateMessage("Running...");
try {
DatagramSocket serverSocket = new DatagramSocket(mPortNum);
// allocate space for received datagrams
byte[] bytes = new byte[1024];
DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
while (!isCancelled()) {
serverSocket.receive(packet);
int bytesReceived = packet.getLength();
MemberSystemStatusMessage statusMessage =
new MemberSystemStatusMessage();
int statusMessageSize = statusMessage.size();
CAServiceHeartbeatMessage heartbeatMessage =
new CAServiceHeartbeatMessage();
int heartbeatMessageSize = heartbeatMessage.size();
if (Platform.isFxApplicationThread()) {
if (bytesReceived == statusMessage.size()) {
statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
setMemberSystemMessage(statusMessage);
} else if (bytesReceived == heartbeatMessage.size()){
heartbeatMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
setHeartbeatMessage(heartbeatMessage);
} else {
System.out.println("unexpected datagram");
}
} else { // update later in FxApplicationThread
if (bytesReceived == statusMessage.size()) {
statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
Platform.runLater(() -> setMemberSystemMessage(statusMessage));
} else if (bytesReceived == heartbeatMessage.size()){
heartbeatMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
Platform.runLater(() -> setHeartbeatMessage(heartbeatMessage));
} else {
System.out.println("unexpected datagram");
}
}
}
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
updateMessage("Cancelled");
return null;
}
};
}
}
只要缓冲区的大小正确,并且缓冲区的底层内存在调用处理程序之前保持有效,代码看起来就很好。对于给定的I/O对象,可以安全地启动多个非组合异步操作,例如async_send_to()
。尽管如此,这些操作的执行顺序尚未明确。
接收器应用程序具有单个共享字节数组,数据报被读取到该数组中。如果接收到两个数据报,并且发生两次读取操作,则缓冲区将包含最后读取的数据报的内容。基于所提供的代码,由于Runnable
s将在未来某个未指定的时间调用,这可能会创建一个竞争条件。例如,考虑发送两个数据报的场景,第一个包含系统消息,第二个包含心跳消息。在以下代码中:
byte[] bytes = new byte[1024];
DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
while (...)
{
serverSocket.receive(packet);
int bytesReceived = packet.getLength();
MemberSystemStatusMessage statusMessage = ...;
CAServiceHeartbeatMessage heartbeatMessage = ...;
if (bytesReceived == statusMessage.size())
{
statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
Platform.runLater(() -> setMemberSystemMessage(statusMessage));
}
...
}
在while循环的第一次迭代之后,bytes
包含一条状态消息,而statusMessage
对象指的是bytes
缓冲区。Runnable已计划在未来某个未指定的时间运行。在读取第二数据报时,bytes
缓冲器包含心跳消息。Runnable现在运行,将statusMessage
对象传递给setMemberSystemMessage()
;然而,它的底层缓冲区现在包含一个心跳消息。要解决这个问题,请考虑在需要延迟执行时深度复制字节数组:
if (bytesReceived == statusMessage.size())
{
byte[] bytes_copy = Arrays.copyOf(bytes, bytesReceived);
statusMessage.setByteBuffer(ByteBuffer.wrap(bytes_copy), 0);
Platform.runLater(() -> setMemberSystemMessage(statusMessage));
}
或者,可以为每个读取操作使用新的缓冲区。
对基础协议的期望也可能存在问题。UDP被称为不可靠的协议,因为它不向发送者提供关于数据报传递的通知。每个async_send_to()
操作将导致最多传输一个数据报。完成处理程序的状态指示数据是否已写入,并表示数据报是否已收到没有状态。即使通过分散聚集I/O提供了多个缓冲区,这也是正确的。因此,该问题中描述的场景是协议允许的,其中启动两个async_send_to()
操作,但接收方仅接收单个数据报。应用程序协议应该考虑到这种行为。例如,接收方可以在连续数目的错过心跳截止日期超过阈值时报告错误,而不是在错过单个心跳截止日期之后报告错误。在写入之间添加一个小延迟并不能保证协议的行为。
更新:
正如Tanner Sansbury所提到的,这个答案很可能是错误的。我将把它留在这里,让人们来寻找同时对async_send_to
进行多个调用是否有效的问题的答案。答案似乎是"是的"。
原件:
此代码的问题在于对async_send_to()
的第二次调用没有等待第一次调用完成。您应该从第一个完成处理程序对async_send_to()
进行第二次调用,假设没有错误。
在第二个示例中,将两个缓冲区合并为一个数据报是预期行为。
- 是否可以在 OpenGL 中的同一调用中呈现两个具有不同索引起点的不同缓冲区?
- 为什么在调用C#DLL时不需要提供字符串缓冲区
- 写入渲染缓冲区并使用单个渲染调用使用 OpenGL 显示
- 如何从perf_event_open的环形缓冲区中检索调用链以进行PERF_RECORD_SWITCH?
- 绘制的 OpenGL 点消失,绘制调用和交换缓冲区问题
- 是否可以等待来自暂存缓冲区的传输完成而不调用 vkQueueWaitIdle
- 是否可以在同一设备缓冲区上一个接一个地调用 OpenCL 内核?
- P/调用C++用于填充托管缓冲区的 DLL
- 确实调用glbufferdata创建/发布缓冲区
- 调用readAll()后,不再无法从QIODevices检索数据.缓冲区已冲洗
- 如何从uint8_t的缓冲区读取带符号整数,而不调用未定义或实现定义的行为
- 多次调用glGetTexImage时发生缓冲区溢出错误
- 多个背靠背的boost::asio-async_send_to调用导致缓冲区溢出
- 为什么在调用 glDrawArray 之前不需要绑定顶点缓冲区对象?
- 如果要显式指定缓冲区大小,应使用 Fread/FWRITE 还是读/写系统调用
- OpenGL 批处理:为什么我的绘制调用超出了数组缓冲区边界
- 通过统一缓冲区对象发送到 GLSL 的数据会泄漏到其他绘制调用中 (OpenGL 3.2)
- 在openGL中进行draw调用,只触及深度缓冲区
- 在协议缓冲区中注释推送rpc调用
- 将无符号char*缓冲区强制转换为可接受参数的可调用void指针