热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

go实现grpc四种数据流模式

这篇文章主要为大家介绍了go实现grpc四种数据流模式,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步早日升职加薪

1. 什么是数据流

grpc中的stream,srteam顾名思义就是一种流,可以源源不断的推送数据,很适合传输一些大数据,或者服务端和客户端长时间数据交互,比如客户端可以向服务端订阅一个数据,服务端就可以利用stream,源源不断地推送数据。

底层还原成socket编程

2. grpc的四种数据流

1.简单模式

2.服务端数据流模式(Server-side streaming RPC)

3.客户端数据流模式(Client-side streaming RPC)

4.双向数据流模式(Bidirectional streaming RPC)

2.1 简单模式

  这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,这和大家平时熟悉的RPC没有什么大的区别,上两篇中介绍此模式。

2.2 服务端数据流模式

  这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端

2.3 客户端数据流模式

  与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器报送数据。

2.4 双向数据流

  顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。典型的例子是聊天机器人。

3. 上代码

3.1 代码目录

3.2 编写stream.proto文件

stream是常量,写在哪一边,哪一边就是数据流

syntax = "proto3";
option go_package = "./;proto";
service Greeter {
    // 定义方法,stream是常量,流模式
    rpc ServerStream (StreamRequestData) returns (stream StreamResponseData);      //服务端流模式,拉消息
    rpc ClientStream (stream StreamRequestData) returns (StreamResponseData);      //客户端流模式,推消息
    rpc AllStream (stream StreamRequestData) returns (stream StreamResponseData);  //双向流模式,能推能拉
}
message StreamRequestData {
    string data = 1; //编号
}
message StreamResponseData {
    string data = 1; //编号
}

 生成go的protobuf文件命令:

cd到proto目录下

命令:protoc -I . hello.proto   --go_out=plugins=grpc:.

3.3 编写server文件

package main
import (
	"file_test/grpc_go_stream/proto"
	"fmt"
	"net"
	"sync"
	"time"
	"google.golang.org/grpc"
)
const port = 8082
type server struct{}
func (s *server) ServerStream(req *proto.StreamRequestData, res proto.Greeter_ServerStreamServer) error {
	i := 0
	for {
		i++
		//业务代码
		_ = res.Send(&proto.StreamResponseData{
			Data: fmt.Sprintf("这是发给%s的数据流", req.Data),
		})
		time.Sleep(time.Second * 1)
		if i > 10 {
			break
		}
	}
	return nil
}
func (s *server) ClientStream(cliStr proto.Greeter_ClientStreamServer) error {
	for {
		//业务代码
		res, err := cliStr.Recv()
		if err != nil {
			fmt.Println("本次客户端流数据发送完了:",err)
			break
		}
		fmt.Println("客户端发来消息:",res.Data)
	}
	return nil
}
func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
	wg:=sync.WaitGroup{}
	wg.Add(2)
	//接受客户端消息的协程
	go func() {
		defer wg.Done()
		for  {
			//业务代码
			res, err := allStr.Recv()
			if err != nil {
				fmt.Println("本次客户端流数据发送完了:",err)
				break
			}
			fmt.Println("收到客户端发来消息:",res.Data)
		}
	}()
	//发送消息给客户端的协程
	go func() {
		defer wg.Done()
		i := 0
		for {
			i++
			//业务代码
			_ = allStr.Send(&proto.StreamResponseData{
				Data: fmt.Sprintf("这是发给客户端的数据流"),
			})
			time.Sleep(time.Second * 1)
			if i > 10 {
				break
			}
		}
	}()
	wg.Wait()
	return nil
}
// 启动
func start() {
	// 1.实例化server
	g := grpc.NewServer()
	// 2.注册逻辑到server中
	proto.RegisterGreeterServer(g, &server{})
	// 3.启动server
	lis, err := net.Listen("tcp", "127.0.0.1:8082")
	if err != nil {
		panic("监听错误:" + err.Error())
	}
	err = g.Serve(lis)
	if err != nil {
		panic("启动错误:" + err.Error())
	}
}
func main() {
	start()
}

3.4 编写client文件

package main
import (
	"context"
	"file_test/grpc_go_stream/proto"
	"fmt"
	"sync"
	"time"

	"google.golang.org/grpc"
)
var rpc proto.GreeterClient
func serverStreamDemo()  {
	//服务端流模式
	res,err:=rpc.ServerStream(context.Background(),&proto.StreamRequestData{Data: "jeff"})
	if err != nil {
		panic("rpc请求错误:"+err.Error())
	}
	for  {
		data,err:=res.Recv() //
		if err != nil {
			fmt.Println("客户端发送完了:",err)
			return
		}
		fmt.Println("客户端返回数据流值:",data.Data)
	}
}
func clientStreamDemo()  {
	//客户端流模式
	cliStr, err := rpc.ClientStream(context.Background())
	if err != nil {
		panic("rpc请求错误:" + err.Error())
	}
	i := 0
	for {
		i++
		_ = cliStr.Send(&proto.StreamRequestData{
			Data: "jeff",
		})
		time.Sleep(time.Second * 1)
		if i > 10 {
			break
		}
	}
}
func clientAndServerStreamDemo()  {
	//双向流模式
	allStr, _ := rpc.AllStream(context.Background())
	wg := sync.WaitGroup{}
	wg.Add(1)
	//接受服务端消息的协程
	go func() {
		defer wg.Done()
		for {
			//业务代码
			res, err := allStr.Recv()
			if err != nil {
				fmt.Println("本次服务端流数据发送完了:", err)
				break
			}
			fmt.Println("收到服务端发来消息:", res.Data)
		}
	}()
	//发送消息给服务端的协程
	go func() {
		defer wg.Done()
		i := 0
		for {
			i++
			//业务代码
			_ = allStr.Send(&proto.StreamRequestData{
				Data: fmt.Sprintf("这是发给服务端的数据流"),
			})
			time.Sleep(time.Second * 1)
			if i > 10 {
				break
			}
		}
	}()
	wg.Wait()
}
// 启动
func start() {
	conn, err := grpc.Dial("127.0.0.1:8082", grpc.WithInsecure())
	if err != nil {
		panic("rpc连接错误:" + err.Error())
	}
	defer conn.Close()
	rpc = proto.NewGreeterClient(conn) //初始化
	serverStreamDemo() //服务端流模式
	clientStreamDemo()  //客户端流模式
	clientAndServerStreamDemo() // 双向流模式
}
func main() {
	start()
}

以上就是go实现grpc四种数据流模式的详细内容,更多关于go实现grpc流模式的资料请关注其它相关文章!


推荐阅读
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • Android Studio Bumblebee | 2021.1.1(大黄蜂版本使用介绍)
    本文介绍了Android Studio Bumblebee | 2021.1.1(大黄蜂版本)的使用方法和相关知识,包括Gradle的介绍、设备管理器的配置、无线调试、新版本问题等内容。同时还提供了更新版本的下载地址和启动页面截图。 ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • 本文介绍了在mac环境下使用nginx配置nodejs代理服务器的步骤,包括安装nginx、创建目录和文件、配置代理的域名和日志记录等。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 本文介绍了Windows操作系统的版本及其特点,包括Windows 7系统的6个版本:Starter、Home Basic、Home Premium、Professional、Enterprise、Ultimate。Windows操作系统是微软公司研发的一套操作系统,具有人机操作性优异、支持的应用软件较多、对硬件支持良好等优点。Windows 7 Starter是功能最少的版本,缺乏Aero特效功能,没有64位支持,最初设计不能同时运行三个以上应用程序。 ... [详细]
  • phpcomposer 那个中文镜像是不是凉了 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • centos安装Mysql的方法及步骤详解
    本文介绍了centos安装Mysql的两种方式:rpm方式和绿色方式安装,详细介绍了安装所需的软件包以及安装过程中的注意事项,包括检查是否安装成功的方法。通过本文,读者可以了解到在centos系统上如何正确安装Mysql。 ... [详细]
  • Centos7.6安装Gitlab教程及注意事项
    本文介绍了在Centos7.6系统下安装Gitlab的详细教程,并提供了一些注意事项。教程包括查看系统版本、安装必要的软件包、配置防火墙等步骤。同时,还强调了使用阿里云服务器时的特殊配置需求,以及建议至少4GB的可用RAM来运行GitLab。 ... [详细]
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
author-avatar
yhonmen
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有