作者:uigrdg更好_154 | 来源:互联网 | 2023-08-15 09:19
最近开发一个小程序,有一些功能需要用到微信的token比如分享海报,发送模板消息等。官方建议:建议开发者使用中控服务器统一获取和刷新access_token,其他业务逻辑服务器所
最近开发一个小程序,有一些功能需要用到微信的token 比如分享海报,发送模板消息等。官方建议:
建议开发者使用中控服务器统一获取和刷新 access_token,其他业务逻辑服务器所使用的 access_token 均来自于该中控服务器,不应该各自去刷新,否则容易造成冲突,导致 access_token 覆盖而影响业务;
因为之前 一直用 nameko开发rpc ,所以这次准备尝试点新的东西。准备工作
pip install grpcio== 1.19.0 pip install grpcio-tools== 1.19.0 pip install protobuf== 3.12.2
环境已经搭建好了,创建 .proto 结尾的文件。 .proto 文件主要用来 定义接口和数据类型,我定义的 名字叫 grpcdatabase.proto
syntax = “proto3”; //proto3语法 package grpcServer; service Greeter { rpc GetContent (Request) returns (Return) {} //定义要调用的函数(GetContent)+(Request)接受的 } message Request { //传参数据类型 string appid = 1;// 字符串 微信小程序的、appid string secret =2; // 字符串 微信小程序的、secret } message Return { //返回数据类型 string message = 1;//字符串 int32 code=2; // int 类型 string results = 4;// }
客户端调用传参 就是2个 appid 和 secret 服务端 返回 值 是三个 message code results
接下来生成客户端 服务端的代码
python -m grpc_tools.protoc -I. --python_out= ./ --grpc_python_out= ./ grpcdatabase.proto // 在当前目录下生成 服务端和客户端代码 运行后如图
编辑客户端 与 服务段的业务代码
import sysimport requestssys. path. append( 'grpc_base_models' ) import grpcimport timefrom concurrent import futuresimport grpcdatabase_pb2import grpcdatabase_pb2_grpcimport redisimport jsonr_cache = redis. StrictRedis( host= '127.0.0.1' , port= 6379 , db= 6 ) _SLEEP_TIME = 60 _HOST = "0.0.0.0" _PORT = "19999" class RpcServer ( grpcdatabase_pb2_grpc. GreeterServicer) : def GetContent ( self, request, context) : ''':param request::param context::return:''' appid = request. appidsecret= request. secrettoken_tmpl = r_cache. get( "access_token" ) if not token_tmpl: result = requests. get( "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={0}&secret={1}" . format ( appid, secret) ) . json( ) token = result. get( 'access_token' ) expires_in_seconds = int ( result. get( 'expires_in' ) * 0.9 ) r_cache. set ( name= "access_token" , value= token, ex= expires_in_seconds ) else : token = token_tmpl. decode( "utf8" ) print ( token) _content = "success" code = 200 results = tokenreturn grpcdatabase_pb2. Return( message= _content, results= results, code= code) def server ( ) : print ( sys. argv. __len__( ) ) if sys. argv. __len__( ) >= 2 : _PORT = sys. argv[ 1 ] else : _PORT = "19999" grpcServer = grpc. server( futures. ThreadPoolExecutor( max_workers= 10 ) ) grpcdatabase_pb2_grpc. add_GreeterServicer_to_server( RpcServer( ) , grpcServer) grpcServer. add_insecure_port( "{0}:{1}" . format ( _HOST, _PORT) ) grpcServer. start( ) try : while True : time. sleep( _SLEEP_TIME) except KeyboardInterrupt: grpcServer. stop( 0 ) if __name__ == '__main__' : server( )
客户端代码
这样就简单的实现了我们的功能,后期会优化。添加新功能,准备把项目中的定时任务也用grpc 来封装下。