如何使用 GRPC c++ 读取异步服务器端流

How to read async server side streaming using GRPC c++

本文关键字:异步 服务器端 读取 c++ 何使用 GRPC      更新时间:2023-10-16

我正在尝试使用C++实现异步服务器端流。 但我找不到任何好的例子。我很难异步读取流。

服务器代码

class ServerImpl final
{
public:
~ServerImpl()
{
server_->Shutdown();
cq_->Shutdown();
}
void Run()
{
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
HandleRpcs();
}
private:
class CallData
{
public:
CallData(MultiGreeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service)
, cq_(cq)
, responder_(&ctx_)
, status_(CREATE)
, times_(0)
{
Proceed();
}
void Proceed()
{
if (status_ == CREATE)
{
status_ = PROCESS;
service_->RequestsayHello(&ctx_, &request_, &responder_, cq_, cq_, this);
}
else if (status_ == PROCESS)
{                   
if (times_ == 0)
{
new CallData(service_, cq_);
}    
if (times_++ >= 3)
{
status_ = FINISH;
responder_.Finish(Status::OK, this);
}
else
{
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name() + ", no " + request_.num_greetings());    
responder_.Write(reply_, this);
}
}
else
{
GPR_ASSERT(status_ == FINISH);
delete this;
}
}    
private:
MultiGreeter::AsyncService* service_;
ServerCompletionQueue* cq_;
ServerContext ctx_;  
HelloRequest request_;
HelloReply reply_; 
ServerAsyncWriter<HelloReply> responder_;    
int times_;   
enum CallStatus
{
CREATE,
PROCESS,
FINISH
};
CallStatus status_;
};
void HandleRpcs()
{
new CallData(&service_, cq_.get());
void* tag; 
bool ok;
while (true)
{
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
MultiGreeter::AsyncService service_;
std::unique_ptr<Server> server_;
};

客户端代码

class GreeterClient
{
public:
GreeterClient(std::shared_ptr<Channel> channel)
: stub_(MultiGreeter::NewStub(channel))
{}
void SayHello(const std::string& user, const std::string& num_greetings)
{
HelloRequest request;
request.set_name(user);
request.set_num_greetings(num_greetings);
ClientContext context;
CompletionQueue cq;
void* got_tag = (void*)1;
bool ok = false;
std::unique_ptr<ClientAsyncReader<HelloReply>>       reader(stub_>PrepareAsyncSayHello(&context,request, &cq));    
std::cout << "Got reply: " << reply.message() << std::endl;          
reader->Read(&reply,got_tag);
Status status;
reader->Finish(&status, (void*)1);
GPR_ASSERT(cq.Next(&got_tag, &ok));
GPR_ASSERT(ok);   
if (status.ok()) 
{
std::cout << "sayHello rpc succeeded." << std::endl;
} 
else 
{
std::cout << "sayHello rpc failed." << std::endl;
std::cout << status.error_code() << ": " << status.error_message() << std::endl;
}
}
private:
std::unique_ptr<MultiGreeter::Stub> stub_;
};

我的原型服务

rpc SayHello (HelloRequest) returns (stream HelloReply) {}

当我使用客户端阅读器以同步方式读取流时,它工作正常。 但是当我使用客户端异步阅读器时,我收到此运行时错误。

Assertion failed: (started_), function Finish, file /usr/local/include/grpcpp/impl/codegen/async_stream_impl.h, line 250.
23:25:40: The program has unexpectedly finished.

提前谢谢。

错误消息Assertion failed: (started_), function Finish,指向在使用Read初始化异步读取操作之前调用ClientAsyncReaderFinish函数的问题。这会破坏异步操作的预期顺序。

问题是在Read操作完成之前正在使用Finish函数。当你使用ClientAsyncReader时,你应该先做Read,等待它完成,然后做Finish

在您的情况下,请尝试这样的事情:

...
reader->Read(&reply, got_tag);
// Wait for the read operation to complete
void* got_tag_read;
bool ok_read;
GPR_ASSERT(cq.Next(&got_tag_read, &ok_read));
GPR_ASSERT(ok_read);
Status status;
reader->Finish(&status, (void*)1);
// Wait for the Finish operation to complete
void* got_tag_finish;
bool ok_finish;
GPR_ASSERT(cq.Next(&got_tag_finish, &ok_finish));
GPR_ASSERT(ok_finish);
...