简介
RSocket是在华盛顿特区举行的SpringOne平台会议上宣布的,是一种新的第7层语言无关的应用网络协议。它是一种基于Reactive Streams背压的双向,多路复用,基于消息的二进制协议。它由Facebook,Netifi和Pivotal等工程师开发,提供Java,Javascript,C ++和Kotlin等实现。
开源 RSocket 专为服务而设计。它是一种面向连接的消息驱动协议,在应用程序级别具有内置流控制。它既可以在浏览器中同样使用,也可以在 服务器上使用。这意味着您可以流式传输数据或执行Pub / Sub而无需设置应用程序队列。它也是二进制的。它对文本和二进制数据同样有效,并且对传输的内容负荷进行分段。
该协议专门设计用于与Reactive风格应用配合使用,这些应用程序基本上是非阻塞的,并且通常(但不总是)与异步行为配对。使所谓Reactive背压: 即发布者无法向订户发送数据直到该订户已经准备就绪的想法,这是与“异步”的关键区别。反应式编程(响应式reactive)是 Java 中高效应用的下一个前沿。但有两个主要障碍 - 数据访问和网络。RSocket旨在解决后一个问题,而R2DBC旨在解决前者问题。
HTTP vs RSocket
HTTP的一个重要问题是,它强迫客户端负担起所有责任来处理不同类型的错误上,包括重试逻辑,超时,断路器等。使用Reactive架构构建的应用程序可以提高效率并扩展。
RSocket与HTTP的不同之处在于它定义了四种交互模型:
- Fire-and-Forget:优化请求/响应,在不需要响应时非常有用,例如非关键事件日志记录。
- 请求/响应:当您发送一个请求并收到一个响应时,就像HTTP一样。即使在这里,该协议也具有优于HTTP的优点,因为它是异步和多路复用的。
- 请求/流:类似于返回集合的请求/响应,集合被回送而不是查询直到完成,因此例如发送银行帐号,用实时的帐户事务流进行响应。
- 频道:允许任意交互模型的双向消息流。
基于消息意味着协议可以支持单个连接上的多路复用。此外,与TCP一样,它是真正的双向,因此一旦客户端启动与服务器的连接,连接中的双方就变得彼此等同 - 实质上,服务器可以从客户端请求数据。
RSocket 特性
RSocket还支持基于每个消息的流量控制
RSocket还支持基于每个消息的流量控制。在主题演讲中,Facebook工程师Steve Gury表示:
当您发送消息时,您还要指定您能够满足多少响应,并且服务器必须满足该约束,但是当我完成处理这些响应后,我可以要求更多。RSocket也可以在链中工作,因此如果链接多个RSocket连接,流控制将是端到端地工作。
实质上,RSocket解决的问题是跨流进程的背压,即网络上的背压。
当必须在服务网格中调用另一个微服务时会发生什么,我如何保证它不会出现调用一大堆数据,并且它不会尝试向我客户端发送所有这些数据?
RSocket与传输无关,支持TCP,WebSocket和Aeron UDP,支持混合传输协议,不会造成语义损失 - 背压和流量控制都将继续工作。
它还支持连接恢复。建立RSocket连接时,您可以指定先前连接的ID,如果服务器仍在内存中有流,则可以恢复流的消耗。
它是一种消息驱动的二进制协议,在指定网络连接的情况下,请求者 - 响应者交互被分解为一组离散的帧,这些帧中的每一个都封装了某种消息。
框架是二进制的,而不是人类可读的,如 JSON 或XML,为机器到机器的通信提供了显着的效率。与所有消息传递协议一样,消息传递的内容有效负荷只是字节流,因此可以是您想要的任何内容,包括 XML 或JSON。
在Facebook,RSocket用于名为LiveServer的服务,该服务负责响应可被视为GraphQL订阅的实时查询。服务器响应数据,但也响应未来的更新流
RSocket 实例
Demo Java Server:
RSocketFactory.receive().frameDecoder(Frame::retain).acceptor(new PingHandler()).transport(TcpServerTransport.create(7878)).start().block().onClose();
Demo Java Client:
Mono client =RSocketFactory.connect().frameDecoder(Frame::retain).transport(TcpClientTransport.create(7878)).start();PingClient pingClient = new PingClient(client);Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1));int count = 1_000;pingClient.startPingPong(count, recorder).doOnTerminate(() -> System.out.println("Sent " + count + " messages.")).blockLast();
参考:响应式应用新协议RSocket