Boost ASIO async_read 没有从服务器读取数据

Boost ASIO async_read is not reading data from the server

本文关键字:服务器 读取 数据 ASIO async read Boost      更新时间:2023-10-16

我有一个服务器/客户端应用程序,它用于从客户端写入服务器读取。

服务器在函数read_async_1中接收完数据后,在末尾写入一个简单的字符串"Response"。

现在,客户端不会收到此信息。在客户端代码中,StartHandlingServer是异步读取的内容。现在,这个里面的处理程序没有被调用。

有人可以看看这个吗?感谢您的反馈。

服务器代码

#include <boost/asio.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/bind.hpp>
#include <boost/serialization/vector.hpp>
#include <boost/tuple/tuple.hpp>
#include <thread>
#include <atomic>
#include <memory>
#include <iostream>
#include "../stocks.hpp"
using namespace boost;

class Service {
public:
    Service(){}
    void StartHandligClient(boost::shared_ptr<asio::ip::tcp::socket> sock)
    {
        std::cout << "StartHandligClient : sock.use_count : " << sock.use_count() << "n";
        read_async_1(sock);
        return;
    }
private:
    void read_async_1(boost::shared_ptr<asio::ip::tcp::socket> sock)
    {
        if(!(*sock.get()).is_open())
        {
            std::cout << getpid() << " : Socket closed in sync_read n" << std::flush;
            return ;
        }
        std::cout << "haha_1n" << std::flush;
        boost::asio::async_read( (*sock.get()), boost::asio::buffer(inbound_header_),
                [this, sock](boost::system::error_code ec,
                    size_t bytesRead)
                {
                int headerBytesReceived = bytesRead;
                std::cout << "nn headerBytesReceived : " << headerBytesReceived << "n" << std::flush ;
                if (!ec)
                {
                // Determine the length of the serialized data.
                std::istringstream is(std::string(inbound_header_, header_length));
                std::cout << "is : +" << is.str() << "+, inbound_header_ : +" << inbound_header_ << "+n";
                std::size_t inbound_data_size = 0;
                if (!(is >> std::hex >> inbound_data_size))
                {
                // Header doesn't seem to be valid. Inform the caller.
                // boost::system::error_code error(boost::asio::error::invalid_argument);
                // boost::get<0>(handler)(error);
                    std::cout << "RET-1 n";
                    return;
                }
                std::cout << "inbound_data_size : " << inbound_data_size << "n" << std::flush;
                // Start an asynchronous call to receive the data.
                inbound_data_.resize(inbound_data_size);
                std::cout << "inbound_data_.size() : " << inbound_data_.size() << "n" << std::flush;
                int bytesReceived = asio::read( *sock.get(), boost::asio::buffer(inbound_data_) );
                std::string archive_data(&inbound_data_[0], inbound_data_.size());
                std::istringstream archive_stream(archive_data);
                boost::archive::text_iarchive archive(archive_stream);
                archive >> stocks_;

                std::cout << "bytesReceived : " << bytesReceived << " , stocks_.size() : " << stocks_.size() << "n";
                // Print out the data that was received.
                for (std::size_t i = 0; i < stocks_.size(); ++i)
                {
                    std::cout << "Stock number " << i << "n";
                    std::cout << "  code: " << stocks_[i].code << "n";
                    std::cout << "  name: " << stocks_[i].name << "n";
                    std::cout << "  open_price: " << stocks_[i].open_price << "n";
                    std::cout << "  high_price: " << stocks_[i].high_price << "n";
                    std::cout << "  low_price: " << stocks_[i].low_price << "n";
                    std::cout << "  last_price: " << stocks_[i].last_price << "n";
                    std::cout << "  buy_price: " << stocks_[i].buy_price << "n";
                    std::cout << "  buy_quantity: " << stocks_[i].buy_quantity << "n";
                    std::cout << "  sell_price: " << stocks_[i].sell_price << "n";
                    std::cout << "  sell_quantity: " << stocks_[i].sell_quantity << "n";
                }            
                sleep(1);
                // Sending response.
                std::string response = "Responsen";
                asio::write(*sock.get(), asio::buffer(response));

                this->read_async_1(sock);
                }
                else
                {
                    // Terminate connection ?
                    if(ec == boost::asio::error::eof)
                    {
                            std::cout << getpid() << " : ** sync_read : Connection lost  : boost::asio::error::eof ** n";
                    }
                        std::cout << "Error occured in async_read! Error code = " << ec.value() << ". Message: " << ec.message() << "n" << std::flush;
                    return ;
                }
                return ;
                }
        );
        std::cout << getpid() << " : final return from async_read n" << std::flush;
        return ;
    }
/// The size of a fixed length header.
enum { header_length = 8 };
/// Holds an outbound header.
std::string outbound_header_;
/// Holds the outbound data.
std::string outbound_data_;
/// Holds an inbound header.
char inbound_header_[header_length];
/// Holds the inbound data.
std::vector<char> inbound_data_;
    std::vector<stock> stocks_;
};
class Acceptor {
public:
    Acceptor(asio::io_service& ios, unsigned short port_num) :
        m_ios(ios),
        m_acceptor(m_ios,
        asio::ip::tcp::endpoint(
        asio::ip::address_v4::any(),
        port_num))
    {
        m_acceptor.listen();
    }
    void Accept() {
        std::cout << "Server Accept() n" << std::flush;
        boost::shared_ptr<asio::ip::tcp::socket> sock(new asio::ip::tcp::socket(m_ios));
        m_acceptor.accept(*sock.get());
        (new Service)->StartHandligClient(sock);
        std::cout << "Accept : sock.use_count : " << sock.use_count() << "n";
    }
private:
    asio::io_service& m_ios;
    asio::ip::tcp::acceptor m_acceptor;
};
class Server {
public:
    Server() : m_stop(false) {}
    void Start(unsigned short port_num) {
        m_thread.reset(new std::thread([this, port_num]() {
            Run(port_num);
        }));
    }
    void Stop() {
        std::cout << "STOPPING n";
        m_stop.store(true);
        m_thread->join();
    }
private:
    void Run(unsigned short port_num) {
        Acceptor acc(m_ios, port_num);
        while (!m_stop.load())
        {
            std::cout << "Server acceptn" << std::flush;
            acc.Accept();
            m_ios.run();
        }
    }
    std::unique_ptr<std::thread> m_thread;
    std::atomic<bool> m_stop;
    asio::io_service m_ios;
};
int main()
{
    unsigned short port_num = 3333;
    try {
        Server srv;
        srv.Start(port_num);
        std::this_thread::sleep_for(std::chrono::seconds(100));
        std::cout << "Stopping server n";
        srv.Stop();
    }
    catch (system::system_error &e) {
        std::cout << "Error occured! Error code = "
            << e.code() << ". Message: "
            << e.what();
    }
    return 0;
}

客户端代码

#include <boost/asio.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/bind.hpp>
#include <boost/serialization/vector.hpp>
#include <iostream>
#include <boost/shared_ptr.hpp>
#include "../stocks.hpp"
using namespace boost;
class mysock : public boost::asio::ip::tcp::socket
{
    public:
        mysock(asio::io_service& serv) : boost::asio::ip::tcp::socket(serv)
    {
    }
        ~mysock()
        {
            std::cout << "Inside destructor for mysock n";
        }
};
class SyncTCPClient {
public:
    SyncTCPClient(const std::string& raw_ip_address,
        unsigned short port_num) :
        socket_((new mysock(m_ios))),
        m_ep(asio::ip::address::from_string(raw_ip_address), port_num)
    {
        (*socket_.get()).open(m_ep.protocol());
        connect();
        StartHandlingServer(socket_);
    }
    mysock& socket()
    {
        return *socket_.get();
    }
    void connect() {
        (*socket_.get()).connect(m_ep);
        m_ios.run();
    }
    void StartHandlingServer(boost::shared_ptr<mysock>  sock)
    {
        if(!(*sock.get()).is_open())
        {
            std::cout << getpid() << " : Socket closed in sync_read n" << std::flush;
            return ;
        }
        std::cout << "Start StartHandlingServern" << std::flush;
        char inbound_header_[4];;
        try
        {
        boost::asio::async_read( (*sock.get()), boost::asio::buffer(inbound_header_),
                [this, sock](boost::system::error_code ec,
                    size_t bytesRead)
                {
                int headerBytesReceived = bytesRead;
                std::cout << "nn headerBytesReceived : " << headerBytesReceived << "n" << std::flush ;
                if (!ec)
                {
                }
                else
                {
                    if(ec == boost::asio::error::eof)
                    {
                            std::cout << getpid() << " : ** sync_read : Connection lost  : boost::asio::error::eof ** n";
                    }
                        std::cout << "Error occured in async_read! Error code = " << ec.value() << ". Message: " << ec.message() << "n" << std::flush;
                    return ;
                }
                }
                );
        }
        catch (std::exception& e)
        {
            std::cerr << e.what() << std::endl;
        }
        std::cout << "Done StartHandlingServern" << std::flush;
    }
    void close() {
        (*socket_.get()).shutdown(
            boost::asio::ip::tcp::socket::shutdown_both);
        (*socket_.get()).close();
    }
    std::string emulateLongComputationOp(
        unsigned int duration_sec) {
        std::string request = "EMULATE_LONG_COMP_OP "
            + std::to_string(duration_sec)
            + "n";
        sendRequest(request);
        /*
        sleep(2);
        sendRequest(request);
        sleep(2);
        sendRequest(request);
        */
        return receiveResponse();
    };
private:
    void sendRequest(const std::string& request) {
        std::vector<stock> stocks_;
        // Create the data to be sent to each client.
        stock s;
        s.code = "ABC";
        s.name = "A Big Company";
        s.open_price = 4.56;
        s.high_price = 5.12;
        s.low_price = 4.33;
        s.last_price = 4.98;
        s.buy_price = 4.96;
        s.buy_quantity = 1000;
        s.sell_price = 4.99;
        s.sell_quantity = 2000;
        stocks_.push_back(s);
        // Serialize the data first so we know how large it is.
        std::ostringstream archive_stream;
        boost::archive::text_oarchive archive(archive_stream);
        archive << stocks_;
        outbound_data_ = archive_stream.str();
        std::cout << "outbound_data_ : " << outbound_data_ << "n" << std::flush;
        std::cout << "outbound_data_.size() : " << outbound_data_.size() << "n" << std::flush;
        // Format the header.
        std::ostringstream header_stream;
        header_stream << std::setw(header_length)  << std::hex << outbound_data_.size();
        std::cout << "header_stream.str() : " << header_stream.str() << "n" << std::flush;
        std::cout << "header_stream.str().size() : " << header_stream.str().size() << "n" << std::flush;
        if (!header_stream || header_stream.str().size() != header_length)
        {
            // Something went wrong, inform the caller.
            // boost::system::error_code error(boost::asio::error::invalid_argument);
            // socket_.get_io_service().post(boost::bind(handler, error));
            return;
        }
        outbound_header_ = header_stream.str();
        std::cout << "outbound_header_ : +" << outbound_header_ << "+n" << std::flush;
        // Write the serialized data to the socket. We use "gather-write" to send
        // both the header and the data in a single write operation.
        /*
        std::vector<boost::asio::const_buffer> buffers;
        buffers.push_back(boost::asio::buffer(outbound_header_));
        buffers.push_back(boost::asio::buffer(outbound_data_));
        */
        std::size_t headerSize = asio::write(*socket_.get(), boost::asio::buffer(outbound_header_));
        std::size_t dataSize   = asio::write(*socket_.get(), boost::asio::buffer(outbound_data_));
        std::cout << "headerSize : " << headerSize << " , dataSize : " << dataSize;
    }
    std::string receiveResponse() {
        std::string response;
        /*
        asio::streambuf buf;
        asio::read_until(*socket_.get(), buf, 'n');
        std::istream input(&buf);
        std::getline(input, response);
        */
        return response;
    }
private:
    asio::io_service m_ios;
    boost::shared_ptr<mysock> socket_;
    asio::ip::tcp::endpoint m_ep;
    enum { header_length = 8 };
    std::string outbound_data_;
    std::string outbound_header_;
};
int main()
{
    const std::string raw_ip_address = "127.0.0.1";
    const unsigned short port_num = 3333;
    try {
        SyncTCPClient client(raw_ip_address, port_num);
        std::cout << "Sending request to the server... n"<< std::endl;
        std::string response = client.emulateLongComputationOp(10);
        std::cout << "nResponse received: " << response << std::endl;
        sleep(10);
        std::cout << "nn Closing client connection nn";
        // Close the connection and free resources.
        client.close();
    }
    catch (system::system_error &e) {
        std::cout << "Client Error occured! Error code = " << e.code()
            << ". Message: " << e.what();
        return e.code().value();
    }
    return 0;
}

包含的文件股票.hpp

#ifndef _STOCKS_HPP_
#define _STOCKS_HPP_
struct stock
{
  std::string code;
  std::string name;
  double open_price;
  double high_price;
  double low_price;
  double last_price;
  double buy_price;
  int buy_quantity;
  double sell_price;
  int sell_quantity;
  template <typename Archive>
  void serialize(Archive& ar, const unsigned int version)
  {
    ar & code;
    ar & name;
    ar & open_price;
    ar & high_price;
    ar & low_price;
    ar & last_price;
    ar & buy_price;
    ar & buy_quantity;
    ar & sell_price;
    ar & sell_quantity;
  }
};
#endif

io_service::run 函数将一直运行,直到没有更多事件。

在客户端中,当没有要处理的活动事件时,您可以运行一次,这意味着它将立即返回。

由于io_server未"运行",因此不会处理任何事件。

您需要在循环中调用run(或poll(,就像在服务器中一样。