2019独角兽企业重金招聘Python工程师标准>>>
作者的其他文章还不错 !
原文:
http://maoyidao.iteye.com/blog/1636923
最近团队在开发基于移动互联网的项目&#xff0c;又一次涉及到post service&#xff0c;即在服务器集群之间投递消息。是的&#xff0c;又是一个RPC服务。RPC实现方式从笨重的CORBA&#xff0c;SOAP over HTTP&#xff0c;XMPP over TCP&#xff0c;到轻量级的protobuf&#xff0c;scribe和Avro。这里不想比较各自的应用场景&#xff08;另外后面三种RPC方式极为接近&#xff0c;都是通过提供Object <-> 二进制映射来提高高效的传输&#xff09;&#xff0c;本文的目的是给大家一点可以实际操作的代码&#xff1a;java如何用protobuf 实现rpc
和protobuf-socket-rpc的区别
protobuf-socket-rpc&#xff08;code.google.com/p/protobuf-socket-rpc/&#xff09;是googlecode为rpc写的简单实现。本文介绍的代码和googlecode不同之处在于&#xff1a;
1&#xff0c;基于NIO
2&#xff0c;增加了校验码
高性能RPC over google protobuf
Google&#39;s protocol buffer library makes writing rpc services easy, but it does not contain a rpc implementation. The transport details are left up to the user to implement.
google把这问题留给了我们&#xff0c;那么看看应该怎么实现。hellow world伪代码应该是这样的&#xff1a;
- MessageLite message &#61; getMessage(); // get a proto message object by proto file
- OutputStream out &#61; getOutputStream();
- InputStream in &#61; getInputStream();
- message.writeDelimitedTo(out); // Like writeTo(OutputStream), but writes the size of the message as a varint before writing the data
- messageBuilder.mergeDelimitedFrom(in);
好了&#xff0c;这样就实现了序列化和反序列化。在真正的内容之前加入内容长度&#xff0c;这是一种最简单的实现。为了能可靠的进行传输&#xff0c;我在消息长度前加入了2个byte的验证码&#xff0c;下面就开始逐步构建我的rpc代码。
定义你的proto文件&#xff0c;为传输多种消息&#xff0c;需要有“命令”字段&#xff1a;比如&#xff1a;Maoyidao.proto
List 1&#xff1a;
- package com.maoyidao.rpc;
- message MaoyidaoPacket {
- required int32 cmd &#61; 1;
- required int32 subcmd &#61; 2;
- optional bytes content &#61; 3;
- }
OK&#xff0c;compile it to Java class: protoc -I&#61;$SRC_DIR --java_out&#61;$DST_DIR $SRC_DIR/addressbook.proto
你会得到一个MaoyidaoPacket 类&#xff0c;然后你需要这样获得实例&#xff1a;
List 2&#xff1a;
- Maoyidao.MaoyidaoPacket packet &#61; Maoyidao.MaoyidaoPacket.newBuilder()
- .setCmd(mycmd)
- .setSubcmd(mysubcmd)
- .setContent(ByteString.copyFromUtf8("some message")).build();
我们先不讨论怎么基于MINA创建一个NIO&#xff0c;先假设我们获得了一个OutputStream&#xff0c;看看怎么把消息写出去&#xff08;其中的关键是用一些特殊的字符来区分你的消息&#xff0c;这是RPC over TCP的基本要求&#xff09;&#xff1a;
List 3&#xff1a;
- private final void writeObject(OutputStream os, Maoyidao.MaoyidaoPacket packet) {
- ByteArrayOutputStream baos &#61; new ByteArrayOutputStream();
- com.google.protobuf.CodedOutputStream cos &#61; com.google.protobuf.CodedOutputStream.newInstance(baos);
- cos.writeRawVarint32(3);
- cos.writeRawVarint32(7);
- cos.writeRawVarint32(packet.getSerializedSize());
- vpacket.writeTo(cos);
- cos.flush();
- os.write(baos.toByteArray());
- baos.close();
- }
- }
注意我不仅写了分隔符&#xff0c;还写了content长度。
读进来的时候要用相同的方式解析&#xff0c;假设我们得到了一个Bytebuffer&#xff0c;熟悉NIO的同学知道&#xff0c;你总是会从ByteBuffer中读取数据。同时我需要用到com.google.protobuf.CodedInputStream&#xff1a;Reads and decodes protocol message fields. This class contains two kinds of methods: methods that read specific protocol message constructs and field types (e.g. readTag() and readInt32()) and methods that read low-level values (e.g. readRawVarint32() and readRawBytes(int)).&#xff09;这样我就可以从inputstream中读到校验码&#xff1a;
- ByteBuffer in &#61; getByteBuffer();
- CodedInputStream cis &#61; CodedInputStream.newInstance(in);
- int flag1 &#61; cis.readRawVarint32();
- int flag2 &#61; cis.readRawVarint32();
- if(flag1 !&#61; 3 || flag2 !&#61; 7){
- // find some error
- }
- int contentLength &#61; cis.readRawVarint32();
- int contentLength0 &#61; contentLength &#43; CodedOutputStream.computeRawVarint32Size(contentLength);
- if(in.remaining() >&#61; contentLength0){
- try {
- Maoyidao.MaoyidaoPacket.Builder builder &#61; Maoyidao.MaoyidaoPacket.newBuilder();
- CodedInputStream.newInstance(getBytesFromIn(in,contentLength0)).readMessage(
- builder, ExtensionRegistry.getEmptyRegistry());
- out.write(builder.build());
- in.position(in.position() &#43; protocolLength);
- return true;
- } catch (Exception e) {
- //
- }
- }
- // ByteBuffer没有足够的数据&#xff0c;等待下一次
- // do something
截止目前&#xff0c;我们完成了带校验码的基于protobuf的消息序列化和反序列。在这个实现中&#xff0c;我更偏向把protobuf当做一个序列化工具来使用&#xff0c;整体还是依赖MINA本身提供的架构&#xff0c;这部分将在本系列的下一篇中详细阐述。
本文系maoyidao原创&#xff0c;转载请引用原链接&#xff1a;