boost::asio UDP 广播客户端仅接收"fast"数据包

boost::asio UDP Broadcast Client Only Receives "fast" Packets

本文关键字:数据包 fast 客户端 asio UDP 广播 boost      更新时间:2023-10-16

我用boost::asio编写了一个UDP广播客户端。 它有效,但有一个警告。 如果我发送数据包的速度非常快(至少每 100 毫秒左右发送一个(,它似乎会收到所有数据包。 但是,如果我只发送一个数据包,它似乎没有捕获它。 我正在使用异步接收,所以我无法想象为什么它不起作用。 数据本身相当小,并且始终小于分配的缓冲区大小。 当它确实收到"快速"数据包时,它们是正确的,并且仅包含来自单个"发送"的数据。 在调试器中,它将正确断开每个发送的数据包一次。

页眉:

class BroadcastClient
{
public:
BroadcastClient();
std::optional<std::string> poll();
protected:
void handle_read(const boost::system::error_code& error, std::size_t bytes_transferred);
private:
std::future<void> ioFuture;
std::vector<uint8_t> buffer;
std::string result;
boost::asio::io_service ioService;
std::unique_ptr<boost::asio::ip::udp::socket> socket;
uint16_t port{ 8888 };
boost::asio::ip::udp::endpoint sender_endpoint;
};

实现:

BroadcastClient::BroadcastClient()
{
this->socket = std::make_unique<boost::asio::ip::udp::socket>(
this->ioService, boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4::any(), this->port));
this->socket->set_option(boost::asio::socket_base::broadcast(true));
this->socket->set_option(boost::asio::socket_base::reuse_address(true));
this->ioFuture = std::async(std::launch::async, [this] { this->ioService.run(); });
this->buffer.resize(4096);
this->socket->async_receive_from(
boost::asio::buffer(this->buffer, this->buffer.size()), sender_endpoint,
boost::bind(&BroadcastClient::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
void BroadcastClient::handle_read(const boost::system::error_code& error, std::size_t bytes_transferred)
{
if(!error)
{
this->result += std::string(std::begin(buffer), std::begin(buffer) + buffer.size());
std::fill(std::begin(buffer), std::end(buffer), 0);

this->socket->async_receive_from(
boost::asio::buffer(this->buffer, this->buffer.size()), sender_endpoint,
boost::bind(&BroadcastClient::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
}
std::optional<std::string> BroadcastClient::poll()
{
if(this->result.empty() == false)
{
auto copy = this->result;
this->result.clear();
return copy;
}
return {};
}

我进行了很长时间的搜索,因为广播UDP可能很挑剔。然后我发现了你的future<void>.我不仅不相信std::async会做你所期望的(它几乎可以做任何事情(,而且,有一个潜在的致命种族,这是99%确定你的问题:

  • 您启动异步任务 - 它将在将来的某个时间启动/

  • 只有这样,您才能添加async_receive_from操作。如果任务已经开始,则队列将为空,run()完成,未来将ready。事实上,当您执行以下操作时,这是可见的:

    ioService.run();
    std::clog << "End of run " << std::boolalpha << ioService.stopped() << std::endl;
    

它正在打印

End of run true

大多数时候对我来说。我建议使用线程:

ioThread = std::thread([this] {
ioService.run();
std::clog << "End of run " << std::boolalpha << ioService.stopped() << std::endl;
});

具有相应的join

~BroadcastClient() {
std::clog << "~BroadcastClient()" << std::endl;
ioThread.join();
}

为了完成,还要处理异常:是否应该捕获 boost::asio::io_service::run(( 引发的异常?或者使用thread_pool(1)这很好,因为它也替换了你的io_service

或者,使用工作防护装置(io_service::workmake_executor_guard(。

现在,在本地测试时,我似乎无法让它错过数据包。

更多评论

  1. 通常,您希望更早地知道代码中何时出现错误条件,因此请在handle_read中报告error,因为这样的条件会导致异步循环终止。有关更多固定handle_read,请参见下文

  2. result缓冲区不是线程安全的,您可以从多个线程¹访问它。这引发了未定义的行为。添加同步,或使用例如原子交换。

    ¹ 若要确保poll发生在服务线程上,必须将轮询操作post到io_service。这是不可能的,因为该服务是私有的

  3. 您在handle_read中使用buffer.size(),但这是硬编码的 (4096(。你可能想要bytes_transferred

    result.append(std::begin(buffer), std::begin(buffer) + bytes_transferred);
    

    也避免了不必要的临时。此外,现在您不必将缓冲区重置为零:

    void BroadcastClient::handle_read(const boost::system::error_code& error, std::size_t bytes_transferred) {
    if (!error) {
    std::lock_guard lk(result_mx);
    result.append(std::begin(buffer), std::begin(buffer) + bytes_transferred);
    start_read();
    } else {
    std::clog << "handle_read: " << error.message() << std::endl;
    }
    }
    
  4. 为什么socket动态实例化?实际上,您应该在构造函数初始值设定项列表中初始化它,或者从 NSMI 中初始化 C++11:

    uint16_t port{ 8888 };
    boost::asio::io_service ioService;
    udp::socket socket { ioService, { {}, port } };
    
  5. async_receive_from调用重复。这需要start_read或类似的方法。另外,请考虑使用 lambda 来减少代码,而不是依赖老式boost::bind

    void BroadcastClient::start_read() {
    socket.async_receive_from(
    boost::asio::buffer(buffer), sender_endpoint,
    [this](auto ec, size_t xfr) { handle_read(ec, xfr); });
    }
    

完整列表

住在科里鲁

#include <boost/asio.hpp>
#include <iostream>
#include <iomanip>
#include <thread>
#include <mutex>
using namespace std::chrono_literals;
class BroadcastClient {
using socket_base = boost::asio::socket_base;
using udp = boost::asio::ip::udp;
public:
BroadcastClient();
~BroadcastClient() {
std::clog << "~BroadcastClient()" << std::endl;
socket.cancel();
work.reset();
ioThread.join();
}
std::optional<std::string> poll();
protected:
void start_read();
void handle_read(const boost::system::error_code& error, std::size_t bytes_transferred);
private:
uint16_t port{ 8888 };
boost::asio::io_service ioService;
boost::asio::executor_work_guard<
boost::asio::io_service::executor_type> work { ioService.get_executor() };
udp::socket socket { ioService, { {}, port } };
std::thread ioThread;
std::string buffer = std::string(4096, '');
std::mutex result_mx;
std::string result;
udp::endpoint sender_endpoint;
};
BroadcastClient::BroadcastClient() {
socket.set_option(socket_base::broadcast(true));
socket.set_option(socket_base::reuse_address(true));
ioThread = std::thread([this] {
ioService.run();
std::clog << "Service thread, stopped? " << std::boolalpha << ioService.stopped() << std::endl;
});
start_read(); // actually okay now because of `work` guard
}
void BroadcastClient::start_read() {
socket.async_receive_from(
boost::asio::buffer(buffer), sender_endpoint,
[this](auto ec, size_t xfr) { handle_read(ec, xfr); });
}
void BroadcastClient::handle_read(const boost::system::error_code& error, std::size_t bytes_transferred) {
if (!error) {
std::lock_guard lk(result_mx);
result.append(std::begin(buffer), std::begin(buffer) + bytes_transferred);
start_read();
} else {
std::clog << "handle_read: " << error.message() << std::endl;
}
}
std::optional<std::string> BroadcastClient::poll() {
std::lock_guard lk(result_mx);
if (result.empty())
return std::nullopt;
else 
return std::move(result);
}
constexpr auto now = std::chrono::steady_clock::now;
int main() {
BroadcastClient bcc;
for (auto start = now(); now() - start < 3s;) {
if (auto r = bcc.poll())
std::cout << std::quoted(r.value()) << std::endl;
std::this_thread::sleep_for(100ms);
}
} // BroadcastClient destructor safely cancels the work

现场测试

g++ -std=c++17 -O2 -Wall -pedantic -pthread main.cpp
while sleep .05; do echo -n "hello world $RANDOM" | netcat -w 0 -u 127.0.0.1 8888 ; done&
./a.out
kill %1

指纹

"hello world 18422"
"hello world 3810"
"hello world 26191hello world 10419"
"hello world 23666hello world 18552"
"hello world 2076"
"hello world 19871hello world 8978"
"hello world 1836"
"hello world 11134hello world 16603"
"hello world 3748hello world 8089"
"hello world 27946"
"hello world 14834hello world 15274"
"hello world 26555hello world 6695"
"hello world 32419"
"hello world 26996hello world 26796"
"hello world 9882"
"hello world 680hello world 29358"
"hello world 9723hello world 31163"
"hello world 3646"
"hello world 10602hello world 22562"
"hello world 18394hello world 17229"
"hello world 20028"
"hello world 14444hello world 3890"
"hello world 16258"
"hello world 28555hello world 21184"
"hello world 31342hello world 30891"
"hello world 3088"
"hello world 1051hello world 5638"
"hello world 24308hello world 7748"
"hello world 18398"
~BroadcastClient()
handle_read: Operation canceled
Service thread, stopped? true

可能/仍然/感兴趣的旧答案内容

等。我注意到这不是"常规"的点对点UDP。

据我了解,组播工作由路由器提供。他们必须维护"订阅"端点的复杂表,以便知道将实际数据包转发到哪里。

许多路由器都在努力解决这些问题,可靠性存在内置陷阱,尤其是在 WiFi 等方面。如果您的路由器(或者更确切地说是包含路由器的拓扑(也在为此苦苦挣扎,并且只是在某个时间间隔停止"记住"组播组中的参与端点,那将/不会/感到惊讶。

我认为这种类型的表必须保留在路由的每个跃点中(包括内核,它可能必须跟踪同一组播组的多个进程(。

关于这一点的一些提示:

  • https://superuser.com/questions/1287485/udp-broadcasting-not-working-on-some-routers

经常听到的一条建议是:

  • 如果可以,请对 dicscovery 使用多播,之后切换到单播
  • 尝试具体说明绑定接口(在您的代码中,您可能希望将 address_v4::any(( 替换为lo(127.0.0.1( 或任何标识您的 NIC 的 IP 地址。