编写简单的C Protobuf流端客户/服务器

Writing a simple C++ protobuf streaming client/server

本文关键字:客户 服务器 Protobuf 简单      更新时间:2023-10-16

我想使用protobuf在客户端和服务器之间来回发送消息。就我而言,我想从服务器发送任意数量的Protobuf消息到客户端。如何在C 中快速构建?

注意:我在汇总了一个非常有用的肯塔·瓦尔达(Kenton Varda)答案和富克森(Fulkerson)的答案之后,写了这个问题,并在stackoverflow上答案。其他人也提出了类似的问题并遇到了类似的障碍 - 请参阅此处,这里和这里。

我是Protobuf和Asio的新手

首先,C Protobuf API缺少内置支持,用于通过单个流/连接发送多个Protobuf消息。Java API具有它,但尚未添加到C 版本中。Kenton Varda(Protobuf V2的创建者)足够好,可以发布C 版本。因此,您需要该代码以获得单个连接上多个消息的支持。

然后,您可以使用boost :: asio创建客户端/服务器。不要尝试使用istream/ostream样式接口ASIO提供;Protobuf所需的包装并创建流类型(ZerocopyInputStream/ZerocopyOutputstream)更容易,但它行不通。我不完全理解原因,但是富尔克森的这个答案谈论了尝试这样做的脆弱性质。它还提供了示例代码以使原始插座适应我们需要的类型。

将所有这些与基本的boost :: ASIO教程一起放在一起,这是客户端和服务器,然后是支持代码。我们正在发送一个简单的Protobuf类的多个实例,称为persistence :: mymessage位于mymessage.pb.h中。用自己的。

客户端:

#include <boost/asio.hpp>
#include "ProtobufHelpers.h"
#include "AsioAdapting.h"
#include "MyMessage.pb.h"
using boost::asio::ip::tcp;
int main()
{
    const char* hostname = "127.0.0.1";
    const char* port = "27015";
    boost::asio::io_service io_service;
    tcp::resolver resolver(io_service);
    tcp::resolver::query query(hostname, port);
    tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
    tcp::socket socket(io_service);
    boost::asio::connect(socket, endpoint_iterator);
    AsioInputStream<tcp::socket> ais(socket);
    CopyingInputStreamAdaptor cis_adp(&ais);
    for (;;)
    {
        persistence::MyMessage myMessage;
        google::protobuf::io::readDelimitedFrom(&cis_adp, &myMessage);
    }
    return 0;
}

服务器:

#include <boost/asio.hpp>
#include "ProtobufHelpers.h"
#include "AsioAdapting.h"
#include "MyMessage.pb.h"
using boost::asio::ip::tcp;
int main()
{
    boost::asio::io_service io_service;
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 27015));
    for (;;)
    {
        tcp::socket socket(io_service);
        acceptor.accept(socket);
        AsioOutputStream<boost::asio::ip::tcp::socket> aos(socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
        CopyingOutputStreamAdaptor cos_adp(&aos);
        int i = 0;
        do {
            ++i;
            persistence::MyMessage myMessage;
            myMessage.set_myString("hello world");
            myMessage.set_myInt(i);
            google::protobuf::io::writeDelimitedTo(metricInfo, &cos_adp);
            // Now we have to flush, otherwise the write to the socket won't happen until enough bytes accumulate
            cos_adp.Flush(); 
        } while (true);
    }
    return 0;
}

这是肯顿·瓦尔达(Kenton Varda)提供的支持文件:

protobufhelpers.h

#pragma once
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/message_lite.h>
namespace google {
    namespace protobuf {
        namespace io {
            bool writeDelimitedTo(
                const google::protobuf::MessageLite& message,
                google::protobuf::io::ZeroCopyOutputStream* rawOutput);
            bool readDelimitedFrom(
                google::protobuf::io::ZeroCopyInputStream* rawInput,
                google::protobuf::MessageLite* message);
        }
    }
}

protobufhelpers.cpp

#include "ProtobufHelpers.h"
namespace google {
    namespace protobuf {
        namespace io {
            bool writeDelimitedTo(
                const google::protobuf::MessageLite& message,
                google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
                // We create a new coded stream for each message.  Don't worry, this is fast.
                google::protobuf::io::CodedOutputStream output(rawOutput);
                // Write the size.
                const int size = message.ByteSize();
                output.WriteVarint32(size);
                uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
                if (buffer != NULL) {
                    // Optimization:  The message fits in one buffer, so use the faster
                    // direct-to-array serialization path.
                    message.SerializeWithCachedSizesToArray(buffer);
                }
                else {
                    // Slightly-slower path when the message is multiple buffers.
                    message.SerializeWithCachedSizes(&output);
                    if (output.HadError()) return false;
                }
                return true;
            }
            bool readDelimitedFrom(
                google::protobuf::io::ZeroCopyInputStream* rawInput,
                google::protobuf::MessageLite* message) {
                // We create a new coded stream for each message.  Don't worry, this is fast,
                // and it makes sure the 64MB total size limit is imposed per-message rather
                // than on the whole stream.  (See the CodedInputStream interface for more
                // info on this limit.)
                google::protobuf::io::CodedInputStream input(rawInput);
                // Read the size.
                uint32_t size;
                if (!input.ReadVarint32(&size)) return false;
                // Tell the stream not to read beyond that size.
                google::protobuf::io::CodedInputStream::Limit limit =
                    input.PushLimit(size);
                // Parse the message.
                if (!message->MergeFromCodedStream(&input)) return false;
                if (!input.ConsumedEntireMessage()) return false;
                // Release the limit.
                input.PopLimit(limit);
                return true;
            }
        }
    }
}

,由富尔克森(Fulkerson)提供:

asioadapting.h

#pragma once
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
using namespace google::protobuf::io;

template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
public:
    AsioInputStream(SyncReadStream& sock);
    int Read(void* buffer, int size);
private:
    SyncReadStream& m_Socket;
};

template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
    m_Socket(sock) {}

template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
    std::size_t bytes_read;
    boost::system::error_code ec;
    bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);
    if (!ec) {
        return bytes_read;
    }
    else if (ec == boost::asio::error::eof) {
        return 0;
    }
    else {
        return -1;
    }
}

template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
public:
    AsioOutputStream(SyncWriteStream& sock);
    bool Write(const void* buffer, int size);
private:
    SyncWriteStream& m_Socket;
};

template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
    m_Socket(sock) {}

template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{
    boost::system::error_code ec;
    m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
    return !ec;
}

我建议使用GRPC。它支持"流"请求,其中客户端和服务器可以随着时间的推移将多个消息发送到单个逻辑请求的一部分,这应该适合您的需求。借助GRPC,您可以为您提供许多Nitty-menty设置,您拥有大量的文档和教程,tls加密是内置的,您有跨语言支持流等