多个背靠背的boost::asio-async_send_to调用导致缓冲区溢出

multiple back to back boost::asio async_send_to calls cause buffer overun

本文关键字:调用 缓冲区 溢出 to send boost asio-async 背靠背      更新时间:2023-10-16

我在使用boost asio发送多个背靠背的独立UDP缓冲区时遇到问题。我有一个1秒的asio定时器,它触发一个回调,通过UDP传输两个独立的UDP数据报结构。这些消息结构中的每一个都是通过std::unique_ptr分配的,所以在调用异步CADaemon::handle_send回调时,它们不应该超出范围。

void
CADaemon::heartBeatTimer(
    const milliseconds& rHeartBeatMs)
{
    mpStatusTimer->expires_from_now(rHeartBeatMs);
    mpStatusTimer->async_wait(boost::bind(
        &CADaemon::heartBeatTimer,
        this, rHeartBeatMs));
    if (mpALBFSocket && mpALBFEndpoint) {
        mpALBFSocket->async_send_to(
            buffer(mpStatusMessage.get(),
                sizeof(MemberSystemStatusMessage)),
            *mpALBFEndpoint,
            boost::bind(&CADaemon::handle_send, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));

        // must insert delay to prevent buffer overwrites
        std::this_thread::sleep_for(std::chrono::milliseconds(10);
        // heartbeat messages are also sent to this socket/endpoint
        mpALBFSocket->async_send_to(
            buffer(mpHeartbeatMessage.get(),
                sizeof(CAServiceHeartbeatMessage)),
            *mpALBFEndpoint,
            boost::bind(&CADaemon::handle_send, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }
}

如果我在发送第一条消息和第二条消息之间设置了一个小延迟,接收应用程序就会工作,但是,如果我按原样发送它们,那么在第一条消息到达接收应用程序时,第二个缓冲区似乎会覆盖第一条消息。

我做错了什么?

我还尝试用下面的代码发送多个缓冲区,但由于它将两个数据报合并为一个长数据报,所以性能更差。

void
CADaemon::heartBeatTimer(
    const milliseconds& rHeartBeatMs)
{
    mpStatusTimer->expires_from_now(rHeartBeatMs);
    mpStatusTimer->async_wait(boost::bind(
        &CADaemon::heartBeatTimer,
        this, rHeartBeatMs));
    if (mpALBFSocket && mpALBFEndpoint) {
        std::vector<boost::asio::const_buffer> transmitBuffers;
        transmitBuffers.push_back(buffer(
            mpStatusMessage.get(), 
            sizeof(MemberSystemStatusMessage)));
        //transmitBuffers.push_back(buffer(
        //    mpHeartbeatMessage.get(), 
        //    sizeof(CAServiceHeartbeatMessage)));
        mpALBFSocket->async_send_to(
            transmitBuffers, *mpALBFEndpoint,
            boost::bind(&CADaemon::handle_send, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }
}

以下是相关头文件中ASIO所涉及的类的成员。

// this message is transmitted @1HZ
std::unique_ptr<MemberSystemStatusMessage> mpStatusMessage;
// this message is transmitted @1HZ
std::unique_ptr<CAServiceHeartbeatMessage> mpHeartbeatMessage;
// this message is received @1HZ
std::unique_ptr<WOperationalSupportMessage> mpOpSupportMessage;
// this message is received @1HZ when valid
std::unique_ptr<MaintenanceOTPMessage> mpOTPMessage;
std::shared_ptr<boost::asio::io_service> mpIOService;
std::unique_ptr<boost::asio::ip::udp::socket> mpALBFSocket;
std::unique_ptr<boost::asio::ip::udp::endpoint> mpALBFEndpoint;
std::unique_ptr<boost::asio::ip::udp::socket> mpServerSocket;
std::unique_ptr<boost::asio::ip::udp::endpoint> mpServerEndpoint;
std::unique_ptr<boost::asio::steady_timer> mpStatusTimer;
std::unique_ptr<uint8_t[]> mpReceiveBuffer;

这是回调处理程序

void
CADaemon::handle_send(
    const boost::system::error_code& error,
    std::size_t bytes_transferred)
{
    static auto& gEvtLog = gpLogger->getLoggerRef(
        Logger::LogDest::EventLog);
    if (!error || (error == boost::asio::error::message_size)) {
        // Critical Section - exclusive write
        boost::unique_lock<boost::shared_mutex> uniqueLock(gRWMutexGuard);
        LOG_EVT_INFO(gEvtLog) << *mpStatusMessage;
        LOG_EVT_INFO(gEvtLog) << *mpHeartbeatMessage;
        LOG_EVT_INFO(gEvtLog) << "Sent " << bytes_transferred << " bytes";
        mpStatusMessage->incrementSequenceCounter();
    } else {
        LOG_EVT_ERROR(gEvtLog) << "handle_send: asio error code["
            << error.value() << "]";
    }
}

编辑:添加了带有缓冲区损坏的接收JAVA应用程序代码

下面的代码显示了接收java应用程序中的代码,注意到接收到的数据报的大小从未损坏,只是内容,大小似乎总是更长的数据报。希望这对追踪问题有用。

    @Override
    protected Task<Void> createTask() {
        return new Task<Void>() {
            @Override
            protected Void call() throws Exception {
                updateMessage("Running...");
                try {
                    DatagramSocket serverSocket = new DatagramSocket(mPortNum);
                    // allocate space for received datagrams
                    byte[] bytes = new byte[1024];
                    DatagramPacket packet = new DatagramPacket(bytes, bytes.length);                    
                    while (!isCancelled()) {                    
                        serverSocket.receive(packet);
                        int bytesReceived = packet.getLength();
                        MemberSystemStatusMessage statusMessage = 
                            new MemberSystemStatusMessage();
                        int statusMessageSize = statusMessage.size();
                        CAServiceHeartbeatMessage heartbeatMessage = 
                            new CAServiceHeartbeatMessage();
                        int heartbeatMessageSize = heartbeatMessage.size();
                        if (Platform.isFxApplicationThread()) {
                            if (bytesReceived == statusMessage.size()) {
                                statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
                                setMemberSystemMessage(statusMessage);
                            } else if (bytesReceived == heartbeatMessage.size()){
                                heartbeatMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
                                setHeartbeatMessage(heartbeatMessage);
                            } else {
                                System.out.println("unexpected datagram");
                            }
                        } else { // update later in FxApplicationThread
                            if (bytesReceived == statusMessage.size()) {
                                statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
                                Platform.runLater(() -> setMemberSystemMessage(statusMessage));
                            } else if (bytesReceived == heartbeatMessage.size()){
                                heartbeatMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
                                Platform.runLater(() -> setHeartbeatMessage(heartbeatMessage));
                            } else {
                                System.out.println("unexpected datagram");
                            }
                        }
                    }
                } catch (Exception ex) {
                    System.out.println(ex.getMessage());
                }
                updateMessage("Cancelled");
                return null;
            } 
        };
    }
}

只要缓冲区的大小正确,并且缓冲区的底层内存在调用处理程序之前保持有效,代码看起来就很好。对于给定的I/O对象,可以安全地启动多个非组合异步操作,例如async_send_to()。尽管如此,这些操作的执行顺序尚未明确。

接收器应用程序具有单个共享字节数组,数据报被读取到该数组中。如果接收到两个数据报,并且发生两次读取操作,则缓冲区将包含最后读取的数据报的内容。基于所提供的代码,由于Runnable s将在未来某个未指定的时间调用,这可能会创建一个竞争条件。例如,考虑发送两个数据报的场景,第一个包含系统消息,第二个包含心跳消息。在以下代码中:

byte[] bytes = new byte[1024];
DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
while (...)
{
  serverSocket.receive(packet);
  int bytesReceived = packet.getLength();
  MemberSystemStatusMessage statusMessage =  ...;
  CAServiceHeartbeatMessage heartbeatMessage =  ...;
  if (bytesReceived == statusMessage.size())
  {
    statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
    Platform.runLater(() -> setMemberSystemMessage(statusMessage));
  }
  ...
}

在while循环的第一次迭代之后,bytes包含一条状态消息,而statusMessage对象指的是bytes缓冲区。Runnable已计划在未来某个未指定的时间运行。在读取第二数据报时,bytes缓冲器包含心跳消息。Runnable现在运行,将statusMessage对象传递给setMemberSystemMessage();然而,它的底层缓冲区现在包含一个心跳消息。要解决这个问题,请考虑在需要延迟执行时深度复制字节数组:

if (bytesReceived == statusMessage.size())
{
  byte[] bytes_copy = Arrays.copyOf(bytes, bytesReceived);
  statusMessage.setByteBuffer(ByteBuffer.wrap(bytes_copy), 0);
  Platform.runLater(() -> setMemberSystemMessage(statusMessage));
}

或者,可以为每个读取操作使用新的缓冲区。

对基础协议的期望也可能存在问题。UDP被称为不可靠的协议,因为它不向发送者提供关于数据报传递的通知。每个async_send_to()操作将导致最多传输一个数据报。完成处理程序的状态指示数据是否已写入,并表示数据报是否已收到没有状态。即使通过分散聚集I/O提供了多个缓冲区,这也是正确的。因此,该问题中描述的场景是协议允许的,其中启动两个async_send_to()操作,但接收方仅接收单个数据报。应用程序协议应该考虑到这种行为。例如,接收方可以在连续数目的错过心跳截止日期超过阈值时报告错误,而不是在错过单个心跳截止日期之后报告错误。在写入之间添加一个小延迟并不能保证协议的行为。

更新:

正如Tanner Sansbury所提到的,这个答案很可能是错误的。我将把它留在这里,让人们来寻找同时对async_send_to进行多个调用是否有效的问题的答案。答案似乎是"是的"。

原件:

此代码的问题在于对async_send_to()的第二次调用没有等待第一次调用完成。您应该从第一个完成处理程序对async_send_to()进行第二次调用,假设没有错误。

在第二个示例中,将两个缓冲区合并为一个数据报是预期行为。