作者:liu100897 | 来源:互联网 | 2023-05-17 10:00
欢迎回来!2.请求流接口(客户端可以源源不断的给服务端传参数,服务端会源源不断的接受服务端的参数,最后在客户端完成请求的时候,服务端返回一个结果) 在.proto文件中新
欢迎回来!
2.请求流接口
(客户端可以源源不断的给服务端传参数,服务端会源源不断的接受服务端的参数,最后在客户端完成请求的时候,服务端返回一个结果)
在.proto文件中新加一个方法,这个方法的参数被 stream 关键字修饰
rpc methodRequestStream(stream Request) returns (Result) {}
然后用maven,清理一下缓存,重新编译一下
2.1.服务端
重新编译之后,实现刚刚新加的方法
@Override
public StreamObserver methodRequestStream(StreamObserver responseObserver) {
return new StreamObserver() {
@Override
public void onNext(Request request) {
System.out.print("收到了请求 \n");
}
@Override
public void one rror(Throwable throwable) {
}
@Override
public void onCompleted() {
Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build();
responseObserver.onNext(result);
responseObserver.onCompleted();
}
};
}
(友情提示,如果 StreamObserver 的的泛型是Result 我们就叫 返回流观察者,如果是 Request 就叫请求流观察者,这样好描述一些)
这个和普通的有点不一样,直接返回了一个 请求流观察者 的接口实现,而且方法的参数还是一个 返回流观察者 ,好像搞反了一样,至于为什么,一会在客户端那里 统一说
2.2.客户端
请求流式异步调用,普通的是同步调用,我们在普通的方法里创建的实例 也是同步的,所以我们要在 JavaGrpcClient 中新加一个 异步调用的方法,添加一个异步的实例
public Result runAsync(Functional functional)
{
TestServiceGrpc.TestServiceStub testServiceStub =
TestServiceGrpc.newStub(channel);
return functional.run(testServiceStub);
}TestServiceGrpc.newStub 返回的是一个异步的实例
再加一个测试
@Test
public void contextLoads2() {
Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();
StreamObserver respOnseObserver= new StreamObserver() {
@Override
public void onNext(Result result) {
System.out.print("返回了结果 \n");
}
@Override
public void one rror(Throwable throwable) {
}
@Override
public void onCompleted() {
}
};
StreamObserver result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver));
result.onNext(request);
result.onNext(request);
result.onNext(request);
result.onCompleted();
try {
Thread.sleep(600000);
}
catch (Exception ex){}
}
这里我们实现了一个 返回流观察者
StreamObserver respOnseObserver= new StreamObserver() {
@Override
public void onNext(Result result) {
System.out.print("返回了结果 \n");
}
@Override
public void one rror(Throwable throwable) {
}
@Override
public void onCompleted() {
}
};
调用方法的时候,将我们实现的 返回流观察者 传进去,返回给我们一个 请求流观察者
StreamObserver result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver));
其实这里返回的 请求流观察者 就是服务端那里返回给我们的内个实现,服务端那里 返回流观察者 是我们实现的 传给他的
由于是异步调用,最后暂停一下,要不测试跑完,程序结束 开没开始就结束了
try {
Thread.sleep(600000);
}
catch (Exception ex){}
运行起来看结果
服务端的打印
客户端的打印
这里我们发送了三次参数过去
result.onNext(request);
result.onNext(request);
result.onNext(request);
就相当于 服务端 那边返回的 请求流观察者 被调用了 三次 ,所以就打印了三句话
发送完参数结束请求
result.onCompleted();
服务端那里的结束请求中调用了一次我们传给他的 返回流观察者 中的 onNext 方法
所以客户端就打印了一次
这里会有人问 这里不能返回 多个吗
不能,虽然 这两个观察者 看上去一样 都是 StreamObserver 接口,但是,这个方法只是请求流调用,在grpc的内部 最后返回的时候 只返回第一个指定的返回只,不管返回了多少个,在客户端那边只会收到 第一个返回的结果
3.响应流接口(和请求流接口完全相反,请求流是异步,响应流是同步,请求流是接受多个请求返回一个结果,响应流是接受一个请求返回多个结果)
我们在.proto文件中再增加一个方法,这回这个方法的返回值被 stream 关键字修饰
rpc methodResultStream(Request) returns (stream Result){}
清缓存,重新编译
3.1.服务端
实现刚刚新加的方法
@Override
public void methodResultStream(Request request, StreamObserver responseObserver) {
System.out.print("收到了请求 \n");
Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build();
responseObserver.onNext(result);
responseObserver.onNext(result);
try {
Thread.sleep(2000);
}
catch (Exception ex){}
responseObserver.onNext(result);
responseObserver.onCompleted();
}
这里跟普通的差不多,只是我们返回了三次结果
responseObserver.onNext(result);
responseObserver.onNext(result);
try {
Thread.sleep(2000);
}
catch (Exception ex){}
responseObserver.onNext(result);
3.2.客户端
没啥好加的了,直接上测试
@Test
public void contextLoads3() {
Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();
Iterator result = javaGrpcClient.run(o -> o.methodResultStream(request));
result.forEachRemaining(o ->
{
System.out.print("返回了结果 \n");
});
System.out.print("结束 \n");
}
返回流请求是同步的,所以要调同步的方法,返回了一个迭代器
Iterator result = javaGrpcClient.run(o -> o.methodResultStream(request));
迭代器中有服务端的所有返回结果
result.forEachRemaining(o ->
{
System.out.print("返回了结果 \n");
});
运行结果
服务端结果
客户端结果
由于是同步调用,在forEach中会等待服务端的每一个返回结果
4.双向流接口 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
歇会,抽根烟!
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
在.proto文件中再加一个方法
rpc methodDoubleStream(stream Request) returns (stream Result){}
实现她
双向流的服务端和请求流的没啥区别,只是在接收到请求的时候没有立刻结束请求
@Override
public StreamObserver methodDoubleStream(StreamObserver responseObserver) {
return new StreamObserver() {
@Override
public void onNext(Request value) {
System.out.print("收到了请求 \n");
Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build();
responseObserver.onNext(result);
}
@Override
public void one rror(Throwable t) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
客户端也没啥区别
@Test
public void contextLoads4() {
Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();
StreamObserver respOnseObserver= new StreamObserver() {
@Override
public void onNext(Result result) {
System.out.print("返回了结果 \n");
}
@Override
public void one rror(Throwable throwable) {
}
@Override
public void onCompleted() {
}
};
StreamObserver result = javaGrpcClient.runAsync(o -> o.methodDoubleStream(responseObserver));
result.onNext(request);
result.onNext(request);
result.onNext(request);
result.onCompleted();
try {
Thread.sleep(600000);
}
catch (Exception ex){}
}
双向流也是异步的,所以要等待
try {
Thread.sleep(600000);
}
catch (Exception ex){}
服务端结果
客户端结果
完结!撒花!