热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

java版gRPC实战之五:双向流

欢迎访问我的GitHubhttps:github.comzq2599blog_demos内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、De

欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;


本篇概览



  • 本文是《java版gRPC实战》系列的第五篇,目标是掌握双向流类型的服务,即请求参数是流的形式,响应的内容也是流的形式;

  • 先来看看官方资料对双向流式RPC的介绍:是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留;

  • 掌握了客户端流和服务端流两种类型的开发后,双向流类型就很好理解了,就是之前两种类型的结合体,请求和响应都按照流的方式处理即可;

  • 今天的实战,咱们来设计一个在线商城的功能:批量减扣库存,即客户端提交多个商品和数量,服务端返回每个商品减扣库存成功和失败的情况;

  • 咱们尽快进入编码环节吧,具体内容如下:



  1. 在proto文件中定义双向流类型的gRPC接口,再通过proto生成java代码

  2. 开发服务端应用

  3. 开发客户端应用

  4. 验证


源码下载



  • 本篇实战中的完整源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):



























名称链接备注
项目主页https://github.com/zq2599/blog_demos该项目在GitHub上的主页
git仓库地址(https)https://github.com/zq2599/blog_demos.git该项目源码的仓库地址,https协议
git仓库地址(ssh)git@github.com:zq2599/blog_demos.git该项目源码的仓库地址,ssh协议


  • 这个git项目中有多个文件夹,《java版gRPC实战》系列的源码在grpc-tutorials文件夹下,如下图红框所示:

在这里插入图片描述



  • grpc-tutorials文件夹下有多个目录,本篇文章对应的服务端代码在double-stream-server-side目录下,客户端代码在double-stream-client-side目录下,如下图:

在这里插入图片描述


在proto文件中定义双向流类型的gRPC接口



  • 首先要做的就是定义gRPC接口,打开mall.proto,在里面新增方法和相关的数据结构,需要重点关注的是BatchDeduct方法的入参ProductOrder和返回值DeductReply都添加了stream修饰(ProductOrder是上一章定义的),代表该方法是双向流类型:

// gRPC服务,这是个在线商城的库存服务
service StockService {
// 双向流式:批量扣减库存
rpc BatchDeduct (stream ProductOrder) returns (stream DeductReply) {}
}
// 扣减库存返回结果的数据结构
message DeductReply {
// 返回码
int32 code = 1;
// 描述信息
string message = 2;
}


  • 双击下图红框中的task即可生成java代码:

在这里插入图片描述



  • 生成下图红框中的文件,即服务端定义和返回值数据结构:

在这里插入图片描述



  • 接下来开发服务端;


开发服务端应用



  • 在父工程grpc-turtorials下面新建名为double-stream-server-side的模块,其build.gradle内容如下:

// 使用springboot插件
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
// 作为gRPC服务提供方,需要用到此库
implementation 'net.devh:grpc-server-spring-boot-starter'
// 依赖自动生成源码的工程
implementation project(':grpc-lib')
// annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor
annotationProcessor 'org.projectlombok:lombok'
}


  • 配置文件application.yml:

spring:
application:
name: double-stream-server-side
# gRPC有关的配置,这里只需要配置服务端口号
grpc:
server:
port: 9901


  • 启动类DoubleStreamServerSideApplication.java的代码就不贴了,普通的springboot启动类而已;

  • 重点是提供grpc服务的GrpcServerService.java,咱们要做的就是给上层框架返回一个匿名类,至于里面的onNext、onCompleted方法何时被调用是上层框架决定的,另外还准备了成员变量totalCount,这样就可以记录总数了,由于请求参数是流,因此匿名类的onNext会被多次调用,并且由于返回值是流,因此onNext中调用了responseObserver.onNext方法来响应流中的每个请求,这样客户端就不断收到服务端的响应数据(即客户端的onNext方法会被多次调用):

package grpctutorials;
import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
@GrpcService
@Slf4j
public class GrpcServerService extends StockServiceGrpc.StockServiceImplBase {
@Override
public StreamObserver

batchDeduct(StreamObserver responseObserver) {
// 返回匿名类,给上层框架使用
return new StreamObserver

() {
private int totalCount = 0;
@Override
public void onNext(ProductOrder value) {
log.info("正在处理商品[{}],数量为[{}]",
value.getProductId(),
value.getNumber());
// 增加总量
totalCount += value.getNumber();
int code;
String message;
// 假设单数的都有库存不足的问题
if (0 == value.getNumber() % 2) {
code = 10000;
message = String.format("商品[%d]扣减库存数[%d]成功", value.getProductId(), value.getNumber());
} else {
code = 10001;
message = String.format("商品[%d]扣减库存数[%d]失败", value.getProductId(), value.getNumber());
}
responseObserver.onNext(DeductReply.newBuilder()
.setCode(code)
.setMessage(message)
.build());
}
@Override
public void one rror(Throwable t) {
log.error("批量减扣库存异常", t);
}
@Override
public void onCompleted() {
log.info("批量减扣库存完成,共计[{}]件商品", totalCount);
responseObserver.onCompleted();
}
};
}
}

开发客户端应用



  • 在父工程grpc-turtorials下面新建名为double-stream-server-side的模块,其build.gradle内容如下:

plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'net.devh:grpc-client-spring-boot-starter'
implementation project(':grpc-lib')
}


  • 配置文件application.yml,设置自己的web端口号和服务端地址:

server:
port: 8082
spring:
application:
name: double-stream-client-side
grpc:
client:
# gRPC配置的名字,GrpcClient注解会用到
double-stream-server-side:
# gRPC服务端地址
address: 'static://127.0.0.1:9901'
enableKeepAlive: true
keepAliveWithoutCalls: true
negotiationType: plaintext


  • 启动类DoubleStreamClientSideApplication.java的代码就不贴了,普通的springboot启动类而已;



  • 正常情况下我们都是用StreamObserver处理服务端响应,这里由于是异步响应,需要额外的方法从StreamObserver中取出业务数据,于是定一个新接口,继承自StreamObserver,新增getExtra方法可以返回String对象,详细的用法稍后会看到:



package com.bolingcavalry.grpctutorials;
import io.grpc.stub.StreamObserver;
public interface ExtendResponseObserver extends StreamObserver {
String getExtra();
}


  • 重头戏来了,看看如何远程调用双向流类型的gRPC接口,代码中已经添加详细注释:

package grpctutorials;
import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class GrpcClientService {
@GrpcClient("double-stream-server-side")
private StockServiceGrpc.StockServiceStub stockServiceStub;
/**
* 批量减库存
* @param count
* @return
*/
public String batchDeduct(int count) {
CountDownLatch countDownLatch = new CountDownLatch(1);
// responseObserver的onNext和onCompleted会在另一个线程中被执行,
// ExtendResponseObserver继承自StreamObserver
ExtendResponseObserver respOnseObserver= new ExtendResponseObserver() {
// 用stringBuilder保存所有来自服务端的响应
private StringBuilder stringBuilder = new StringBuilder();
@Override
public String getExtra() {
return stringBuilder.toString();
}
/**
* 客户端的流式请求期间,每一笔请求都会收到服务端的一个响应,
* 对应每个响应,这里的onNext方法都会被执行一次,入参是响应内容
* @param value
*/
@Override
public void onNext(DeductReply value) {
log.info("batch deduct on next");
// 放入匿名类的成员变量中
stringBuilder.append(String.format("返回码[%d],返回信息:%s
" , value.getCode(), value.getMessage()));
}
@Override
public void one rror(Throwable t) {
log.error("batch deduct gRPC request error", t);
stringBuilder.append("batch deduct gRPC error, " + t.getMessage());
countDownLatch.countDown();
}
/**
* 服务端确认响应完成后,这里的onCompleted方法会被调用
*/
@Override
public void onCompleted() {
log.info("batch deduct on complete");
// 执行了countDown方法后,前面执行countDownLatch.await方法的线程就不再wait了,
// 会继续往下执行
countDownLatch.countDown();
}
};
// 远程调用,此时数据还没有给到服务端
StreamObserver

requestObserver = stockServiceStub.batchDeduct(responseObserver);
for(int i=0; i // 每次执行onNext都会发送一笔数据到服务端,
// 服务端的onNext方法都会被执行一次
requestObserver.onNext(build(101 + i, 1 + i));
}
// 客户端告诉服务端:数据已经发完了
requestObserver.onCompleted();
try {
// 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,
// 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,
// await的超时时间设置为2秒
countDownLatch.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("countDownLatch await error", e);
}
log.info("service finish");
// 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得
return responseObserver.getExtra();
}
/**
* 创建ProductOrder对象
* @param productId
* @param num
* @return
*/
private static ProductOrder build(int productId, int num) {
return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
}
}


  • 最后做个web接口,可以通过web请求验证远程调用:

package grpctutorials;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class GrpcClientController {
@Autowired
private GrpcClientService grpcClientService;
@RequestMapping("/")
public String printMessage(@RequestParam(defaultValue = "1") int count) {
return grpcClientService.batchDeduct(count);
}
}


  • 编码完成,开始验证;


验证



  • 启动服务端DoubleStreamServerSideApplication:

在这里插入图片描述



  • 启动客户端DoubleStreamClientSideApplication:

在这里插入图片描述



  • 这里要改:浏览器输入http://localhost:8083/?count=10,响应如下,可见远程调用gRPC服务成功,流式响应的每一笔返回都被客户端收到:

在这里插入图片描述



  • 下面是服务端日志,可见逐一处理了客户端的每一笔数据:

在这里插入图片描述



  • 下面是客户端日志,可见由于CountDownLatch的作用,发起gRPC请求的线程一直等待responseObserver.onCompleted在另一个线程被执行完后,才会继续执行:

在这里插入图片描述



  • 至此,四种类型的gRPC服务及其客户端开发就完成了,一般的业务场景咱们都能应付自如,接下来的文章咱们会继续深入学习,了解复杂场景下的gRPC操作;


你不孤单,欣宸原创一路相伴



  1. Java系列

  2. Spring系列

  3. Docker系列

  4. kubernetes系列

  5. 数据库+中间件系列

  6. DevOps系列


欢迎关注公众号:程序员欣宸


微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...

https://github.com/zq2599/blog_demos




推荐阅读
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • 如何在文本中运行Java程序
    本文介绍了在文本中运行Java程序的步骤,包括创建文本文档、修改后缀、打开DOS命令窗口、编译和运行程序。通过这些步骤,可以在文本中成功运行Java程序并输出结果。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
  • 图像因存在错误而无法显示 ... [详细]
  • SpringBoot整合SpringSecurity+JWT实现单点登录
    SpringBoot整合SpringSecurity+JWT实现单点登录,Go语言社区,Golang程序员人脉社 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 解决Cydia数据库错误:could not open file /var/lib/dpkg/status 的方法
    本文介绍了解决iOS系统中Cydia数据库错误的方法。通过使用苹果电脑上的Impactor工具和NewTerm软件,以及ifunbox工具和终端命令,可以解决该问题。具体步骤包括下载所需工具、连接手机到电脑、安装NewTerm、下载ifunbox并注册Dropbox账号、下载并解压lib.zip文件、将lib文件夹拖入Books文件夹中,并将lib文件夹拷贝到/var/目录下。以上方法适用于已经越狱且出现Cydia数据库错误的iPhone手机。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 本文介绍了如何清除Eclipse中SVN用户的设置。首先需要查看使用的SVN接口,然后根据接口类型找到相应的目录并删除相关文件。最后使用SVN更新或提交来应用更改。 ... [详细]
author-avatar
军魂永驻1971
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有