作者:段娜688 | 来源:互联网 | 2023-08-20 08:34
异步客户端代码绝对于同步客户端来说并没有简单多少,简略来说,就是同步rpc调用是调用完不会立即返回,而是能够异步从队列中取得返回后果,实现调用的解耦,咱们来看代码。
gRPC系列(二) 异步服务应用
相干文章:gRPC系列(一)装置和入门
异步的实现次要围绕的是grpc提供的队列:grpc::CompletionQueue
。
客户端代码
异步客户端代码绝对于同步客户端来说并没有简单多少,简略来说,就是同步rpc调用是调用完不会立即返回,而是能够异步从队列中取得返回后果,实现调用的解耦,咱们来看代码。
#include
#include
#include
#include
#include
#include "../protos/simple/simple.grpc.pb.h"
using grpc::Status;
using grpc::Channel;
using grpc::CompletionQueue;
using grpc::ClientContext;
using grpc::ClientAsyncResponseReader;
using Simple::EchoRequest;
using Simple::EchoResponse;
int main()
{
std::shared_ptr chan = grpc::CreateChannel("localhost:12345",grpc::InsecureChannelCredentials());
std::unique_ptr stub(Simple::Server::NewStub(chan));
ClientContext context;
EchoRequest req;
req.set_msg("hello world!");
EchoResponse resp;
CompletionQueue cq;
// 实现rpc调用会将tag增加到cq队列中
std::unique_ptr> rpc(stub->AsyncEcho(&context, req, &cq));
Status status;
// 第三个参数是一个上下文标签,用于帮咱们标识这个申请
// grpc框架只会将其保存起来
rpc->Finish(&resp, &status, (void*)1);
void* got_tag;
bool ok = false;
// 从队列中获取,申请的标签以及状态
cq.Next(&got_tag, &ok);
if(ok && got_tag == (void*)1){
// check一下后果
std::cout <
服务端代码
异步服务端不不便了解,能够参考:grpc应用记录(三)简略异步服务实例
这里次要波及到的类包含grpc::ServerCompletionQueue
、grpc::ServerAsyncResponseWriter
、grpc::ServerAsyncResponseWriter
、Simple::Server::AsyncService
。
次要的解决流程是,
- 注册一个申请,传入上下文内容,包含context、req、resp以及你本人定义的上下文数据对象(能够作为tag)。
- 主循环生产队列,取出一个数据对象,调用解决逻辑。如果是还未解决,则进行rpc逻辑解决,而后用
grpc::ServerAsyncResponseWriter
异步写响应,并且调配一个新的数据对象(结构的时候会调用Proceed函数注册解决的申请);如果是以及解决好的申请,开释数据对象的空间。
能够这么说,cq的作用就是寄存CallData,而后主循环不断读取CallData,而后依据其中的上下文信息,做出相应的解决。
RequestXXX()其实就是注册一个rpc申请,而后咱们会传入CallData的地址参数,是为了再承受到指定rpc之后写入数据对象到音讯队列中。
Finish()是异步写回响应,这里的传入this指针也是为了让其实现写回后讲this增加到音讯队列中。
代码如下:
#include
#include
#include
#include
#include
#include
#include
#include
#include "../protos/simple/simple.grpc.pb.h"
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerBuilder;
using Simple::EchoRequest;
using Simple::EchoResponse;
class ServerImpl final {
public:
~ServerImpl(){
_server->Shutdown();
_cq->Shutdown();
}
void Run(){
std::string server_address("localhost:12345");
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&_service);
_cq = builder.AddCompletionQueue();
_server = builder.BuildAndStart();
std::cout <<"Serfer listening on" <RequestEcho(&_ctx, &_req,&_responder,_cq,_cq,this);
} else if (_status == PROCESS) {
// 曾经开始解决一个申请了,生成一个新对象供下一个应用
new CallData(_service, _cq); // 会调用proceed注册申请
_resp.set_msg(_req.msg());
_status = FINISH;
_responder.Finish(_resp, Status::OK, this);
} else {
GPR_ASSERT(_status == FINISH);
delete this;
}
}
private:
Simple::Server::AsyncService* _service;
ServerCompletionQueue* _cq;
ServerContext _ctx;
EchoRequest _req;
EchoResponse _resp;
ServerAsyncResponseWriter _responder;
enum CallStatus { CREATE, PROCESS, FINISH};
CallStatus _status;
};
void HandleRpcs() {
new CallData(&_service, _cq.get());
void* tag;
bool ok;
while(true){
GPR_ASSERT(_cq->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast(tag)->Proceed();
}
}
std::unique_ptr _cq;
Simple::Server::AsyncService _service;
std::unique_ptr _server;
};
int main()
{
ServerImpl server;
server.Run();
return 0;
}