这个boost-asio UDP广播代码如何仅与本地主机一起工作?

How should this boost-asio UDP broadcast code work with localhost only?

本文关键字:主机 一起 工作 何仅 UDP boost-asio 广播 代码 这个      更新时间:2023-10-16

boost-asio超时的服务器示例有3个命令行参数。我需要知道第二和第三是什么,以及如何测试服务器(其中Usage: server <listen_port> <bcast_address> <bcast_port>)。它说它们是广播端口和地址,但如果我在一台机器上作为本地主机(例如127.0.0.1)进行测试,那么会发生什么?

下面是代码

#include <algorithm>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <set>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/write.hpp>
using boost::asio::deadline_timer;
using boost::asio::ip::tcp;
using boost::asio::ip::udp;
//----------------------------------------------------------------------
class subscriber
{
public:
  virtual ~subscriber() {}
  virtual void deliver(const std::string& msg) = 0;
};
typedef boost::shared_ptr<subscriber> subscriber_ptr;
//----------------------------------------------------------------------
class channel
{
public:
  void join(subscriber_ptr subscriber)
  {
    subscribers_.insert(subscriber);
  }
  void leave(subscriber_ptr subscriber)
  {
    subscribers_.erase(subscriber);
  }
  void deliver(const std::string& msg)
  {
    std::for_each(subscribers_.begin(), subscribers_.end(),
        boost::bind(&subscriber::deliver, _1, boost::ref(msg)));
  }
private:
  std::set<subscriber_ptr> subscribers_;
};
//----------------------------------------------------------------------
//
// This class manages socket timeouts by applying the concept of a deadline.
// Some asynchronous operations are given deadlines by which they must complete.
// Deadlines are enforced by two "actors" that persist for the lifetime of the
// session object, one for input and one for output:
//
//  +----------------+                     +----------------+
//  |                |                     |                |
//  | check_deadline |<---+                | check_deadline |<---+
//  |                |    | async_wait()   |                |    | async_wait()
//  +----------------+    |  on input      +----------------+    |  on output
//              |         |  deadline                  |         |  deadline
//              +---------+                            +---------+
//
// If either deadline actor determines that the corresponding deadline has
// expired, the socket is closed and any outstanding operations are cancelled.
//
// The input actor reads messages from the socket, where messages are delimited
// by the newline character:
//
//  +------------+
//  |            |
//  | start_read |<---+
//  |            |    |
//  +------------+    |
//          |         |
//  async_- |    +-------------+
//   read_- |    |             |
//  until() +--->| handle_read |
//               |             |
//               +-------------+
//
// The deadline for receiving a complete message is 30 seconds. If a non-empty
// message is received, it is delivered to all subscribers. If a heartbeat (a
// message that consists of a single newline character) is received, a heartbeat
// is enqueued for the client, provided there are no other messages waiting to
// be sent.
//
// The output actor is responsible for sending messages to the client:
//
//  +--------------+
//  |              |<---------------------+
//  | await_output |                      |
//  |              |<---+                 |
//  +--------------+    |                 |
//      |      |        | async_wait()    |
//      |      +--------+                 |
//      V                                 |
//  +-------------+               +--------------+
//  |             | async_write() |              |
//  | start_write |-------------->| handle_write |
//  |             |               |              |
//  +-------------+               +--------------+
//
// The output actor first waits for an output message to be enqueued. It does
// this by using a deadline_timer as an asynchronous condition variable. The
// deadline_timer will be signalled whenever the output queue is non-empty.
//
// Once a message is available, it is sent to the client. The deadline for
// sending a complete message is 30 seconds. After the message is successfully
// sent, the output actor again waits for the output queue to become non-empty.
//
class tcp_session
  : public subscriber,
    public boost::enable_shared_from_this<tcp_session>
{
public:
  tcp_session(boost::asio::io_service& io_service, channel& ch)
    : channel_(ch),
      socket_(io_service),
      input_deadline_(io_service),
      non_empty_output_queue_(io_service),
      output_deadline_(io_service)
  {
    input_deadline_.expires_at(boost::posix_time::pos_infin);
    output_deadline_.expires_at(boost::posix_time::pos_infin);
    // The non_empty_output_queue_ deadline_timer is set to pos_infin whenever
    // the output queue is empty. This ensures that the output actor stays
    // asleep until a message is put into the queue.
    non_empty_output_queue_.expires_at(boost::posix_time::pos_infin);
  }
  tcp::socket& socket()
  {
    return socket_;
  }
  // Called by the server object to initiate the four actors.
  void start()
  {
    channel_.join(shared_from_this());
    start_read();
    input_deadline_.async_wait(
        boost::bind(&tcp_session::check_deadline,
        shared_from_this(), &input_deadline_));
    await_output();
    output_deadline_.async_wait(
        boost::bind(&tcp_session::check_deadline,
        shared_from_this(), &output_deadline_));
  }
private:
  void stop()
  {
    channel_.leave(shared_from_this());
    boost::system::error_code ignored_ec;
    socket_.close(ignored_ec);
    input_deadline_.cancel();
    non_empty_output_queue_.cancel();
    output_deadline_.cancel();
  }
  bool stopped() const
  {
    return !socket_.is_open();
  }
  void deliver(const std::string& msg)
  {
    output_queue_.push_back(msg + "n");
    // Signal that the output queue contains messages. Modifying the expiry
    // will wake the output actor, if it is waiting on the timer.
    non_empty_output_queue_.expires_at(boost::posix_time::neg_infin);
  }
  void start_read()
  {
    // Set a deadline for the read operation.
    input_deadline_.expires_from_now(boost::posix_time::seconds(30));
    // Start an asynchronous operation to read a newline-delimited message.
    boost::asio::async_read_until(socket_, input_buffer_, 'n',
        boost::bind(&tcp_session::handle_read, shared_from_this(), _1));
  }
  void handle_read(const boost::system::error_code& ec)
  {
    if (stopped())
      return;
    if (!ec)
    {
      // Extract the newline-delimited message from the buffer.
      std::string msg;
      std::istream is(&input_buffer_);
      std::getline(is, msg);
      if (!msg.empty())
      {
        channel_.deliver(msg);
      }
      else
      {
        // We received a heartbeat message from the client. If there's nothing
        // else being sent or ready to be sent, send a heartbeat right back.
        if (output_queue_.empty())
        {
          output_queue_.push_back("n");
          // Signal that the output queue contains messages. Modifying the
          // expiry will wake the output actor, if it is waiting on the timer.
          non_empty_output_queue_.expires_at(boost::posix_time::neg_infin);
        }
      }
      start_read();
    }
    else
    {
      stop();
    }
  }
  void await_output()
  {
    if (stopped())
      return;
    if (output_queue_.empty())
    {
      // There are no messages that are ready to be sent. The actor goes to
      // sleep by waiting on the non_empty_output_queue_ timer. When a new
      // message is added, the timer will be modified and the actor will wake.
      non_empty_output_queue_.expires_at(boost::posix_time::pos_infin);
      non_empty_output_queue_.async_wait(
          boost::bind(&tcp_session::await_output, shared_from_this()));
    }
    else
    {
      start_write();
    }
  }
  void start_write()
  {
    // Set a deadline for the write operation.
    output_deadline_.expires_from_now(boost::posix_time::seconds(30));
    // Start an asynchronous operation to send a message.
    boost::asio::async_write(socket_,
        boost::asio::buffer(output_queue_.front()),
        boost::bind(&tcp_session::handle_write, shared_from_this(), _1));
  }
  void handle_write(const boost::system::error_code& ec)
  {
    if (stopped())
      return;
    if (!ec)
    {
      output_queue_.pop_front();
      await_output();
    }
    else
    {
      stop();
    }
  }
  void check_deadline(deadline_timer* deadline)
  {
    if (stopped())
      return;
    // Check whether the deadline has passed. We compare the deadline against
    // the current time since a new asynchronous operation may have moved the
    // deadline before this actor had a chance to run.
    if (deadline->expires_at() <= deadline_timer::traits_type::now())
    {
      // The deadline has passed. Stop the session. The other actors will
      // terminate as soon as possible.
      stop();
    }
    else
    {
      // Put the actor back to sleep.
      deadline->async_wait(
          boost::bind(&tcp_session::check_deadline,
          shared_from_this(), deadline));
    }
  }
  channel& channel_;
  tcp::socket socket_;
  boost::asio::streambuf input_buffer_;
  deadline_timer input_deadline_;
  std::deque<std::string> output_queue_;
  deadline_timer non_empty_output_queue_;
  deadline_timer output_deadline_;
};
typedef boost::shared_ptr<tcp_session> tcp_session_ptr;
//----------------------------------------------------------------------
class udp_broadcaster
  : public subscriber
{
public:
  udp_broadcaster(boost::asio::io_service& io_service,
      const udp::endpoint& broadcast_endpoint)
    : socket_(io_service)
  {
    socket_.connect(broadcast_endpoint);
  }
private:
  void deliver(const std::string& msg)
  {
    boost::system::error_code ignored_ec;
    socket_.send(boost::asio::buffer(msg), 0, ignored_ec);
  }
  udp::socket socket_;
};
//----------------------------------------------------------------------
class server
{
public:
  server(boost::asio::io_service& io_service,
      const tcp::endpoint& listen_endpoint,
      const udp::endpoint& broadcast_endpoint)
    : io_service_(io_service),
      acceptor_(io_service, listen_endpoint)
  {
    subscriber_ptr bc(new udp_broadcaster(io_service_, broadcast_endpoint));
    channel_.join(bc);
    start_accept();
  }
  void start_accept()
  {
    tcp_session_ptr new_session(new tcp_session(io_service_, channel_));
    acceptor_.async_accept(new_session->socket(),
        boost::bind(&server::handle_accept, this, new_session, _1));
  }
  void handle_accept(tcp_session_ptr session,
      const boost::system::error_code& ec)
  {
    if (!ec)
    {
      session->start();
    }
    start_accept();
  }
private:
  boost::asio::io_service& io_service_;
  tcp::acceptor acceptor_;
  channel channel_;
};
//----------------------------------------------------------------------
int main(int argc, char* argv[])
{
  try
  {
    using namespace std; // For atoi.
    if (argc != 4)
    {
      std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>n";
      return 1;
    }
    boost::asio::io_service io_service;
    tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
    udp::endpoint broadcast_endpoint(
        boost::asio::ip::address::from_string(argv[2]), atoi(argv[3]));
    server s(io_service, listen_endpoint, broadcast_endpoint);
    io_service.run();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "n";
  }
  return 0;
}

Asio的超时服务器示例执行以下操作:

  • 作为TCP服务器,监听listen_port,第一个命令行参数
  • 强制TCP客户端的输入和输出截止日期。
  • 当TCP客户端发送消息时,服务器将发送相同的消息给每个连接的客户端和bcast_addressbcast_port指定的UDP端点,第二个和第三个命令行参数。
  • 服务器只向UDP套接字发送数据。服务器没有从UDP套接字读取数据。

尽管代码中使用了命名约定,但它不执行UDP广播。该功能是通过socket_base::broadcast套接字选项启用的。

程序的连接图可以表示如下:

.--------------.    listen_port     .----------------.                  
|              |<---TCP--o )---+--->|  TCP Client A  |                       
|              |               |    '----------------'                        
|    Boost     |               |    .----------------.   
|    Asio      |               `--->|  TCP Client B  |     
|    Server    |                    '----------------'                      
|              | bcast_address:
|              | bcast_port         .----------------.
|              |----UDP--( o------->|  UDP Listener  |
'--------------'                    '----------------'
  • Boost Asio服务器'连接'到UDP侦听器。UDP监听器位于bcast_address上,监听bcast_port端口。
  • 泊斯德。Asio Server在端口listen_port上接受TCP连接。
  • 服务器可以向所有连接写入数据。但是,服务器只会从TCP客户端读取数据。

例如,如果TCP客户端A写入"hello",则服务器将向TCP客户端A, TCP客户端B和UDP侦听器发送"hello"。然而,如果UDP监听器写"再见",那么没有人会收到它。


下面是一些使用netcat运行超时服务器示例的终端。

启动服务器,监听TCP端口12345,发送消息到54321端口的UDP 127.0.0.1

设置<>以前美元。/。输出12345 127.0.0.1 54321

UDP侦听器(将在端口54321上侦听UDP):

$ nc -l -u 54321

启动TCP Client A:

$ nc 127.0.0.1 12345

启动TCP Client B:

$ nc 127.0.0.1 12345

发送信息

从TCP客户端A发送"hello":

$ nc 127.0.0.1 12345你好输入你好之前

TCP Client B received message:

$ nc 127.0.0.1 12345你好之前

以及UDP侦听器:

$ nc -l -u 54321你好