Spring Framework 5带来了新的Reactive Stack非阻塞式Web框架:Spring WebFlux。作为与Spring MVC并行使用的Web框架,Spring WebFlux依赖了反应式流适配器(Reactive Streams Adapter),在Netty和Servlet3.1的容器下,可以提供非阻塞式的Web服务,充分发挥下一代多核处理器的优势,支撑海量的并发访问。
以上是官网的介绍,事实上在基于Spring Boot 2强大的微服务架构帮助下,WebFlux和Spring MVC一起,成为Java应用开发的两大选择,可以让我们迅速地搭建起反应式的Web应用。本文拟通过模拟一个简单的微博应用,实战通过Spring Boot 2+ Spring WebFlux + MongoDB 开发一个Web应用。
Spring WebFlux及其编程范式
Spring WebFlux通过核心库Reactor提供反应式支持,Reactor实现了Reactive Streams,后者是一个带非阻塞式背压的异步流处理器。
Reactor包含两个重要的成员变量Flux和Mono,它们都实现了Reactive Streams提供的
Publisher接口
. Flux
是一个代表了0..N元素的流,Mono
是代表了一个0..1元素的流。虽然WebFlux使用Reactor作为它的核心依赖,它在应用层面,它也同时支持RxJava。
Spring WebFlux支持两种类型的编程范式:
-
传统的基于注解的方式,如@Controller、@RequestMapping等沿用了Spring MVC的模式.
-
基于Java8的Lambda函数式编程模式
本文主要是使用基于注解的方式,今后另文补充基于函数式编程的范式。
基于Spring Boot 2+ Spring WebFlux + MongoDB的轻量级微博应用
以下展示如何搭建一个轻量级的微博应用,这个应用只包括一个domain类Tweet,使用基于MongoDB的在线MongoDB数据库mLab作为存储,并且使用异步的RESTful API提供基本的增删查改功能。
此外还会用到Spring Test组件,通过使用Maven的插件功能,实现对微服务应用的测试。
1. 新建项目
- 点击http://start.spring.io
- 选择2.x以上的Spring Boot版本
- 输入artifact的值,比如webflux-demo
- 选择Reactive Web和Reactive MongoDB依赖
- 点击Generate Project,生成并下载一个微服务框架到本地,并解压
- 使用IDE,比如eclipse,导入解压出来的项目文件
2. 注册mLab账户,并新建一个MongoDB数据库
MongoDB数据库是常用的文档类型数据库,广泛用于社交网站、电商等引用中。而mLab是一个在线MongoDB数据库平台,提供MongoDB的在线服务。这个应用使用到它。
- 前往https://mlab.com
- 根据要求注册账户
- 网站会有免费和收费的服务选择,选择AWS的免费MongoDB服务
- 服务选择完毕,平台会提供一个数据库镜像,可以点击数据库前往管理页面。
- 在User标签下,新建数据库的登录名和密码。
完成以上步骤,数据库就可以开始使用了。你会看到如下图所示的页面:
3. 在项目中配置MongoDB数据库
前往IDE中的项目资源文件夹,找到application.properties。添加你在mLad的MongoDB URI
spring.data.mongodb.uri=mongodb://username:password@ds063439.mlab.com:63439/springdb
在应用启动的时候,Springboot会自动读取该配置文件。
4. 编写应用各模块
WebFlux可以认为是基于Spring的Web开发的一个新的模式或选择,因此它既有Spring MVC有的模块如Domain、Controller、Service,也有新增的如Handler、Router等。下面分别编写各模块。
4.1 Domain包
Domain包只包括一个domain类Tweet.java,因为使用了文档数据库,因此使用@Document注解修饰类,并且使用@Id修饰成员变量id。代码如下:
1 package com.example.webfluxdemo.model; 2 3 import java.util.Date; 4 5 import javax.validation.constraints.NotBlank; 6 import javax.validation.constraints.NotNull; 7 import javax.validation.constraints.Size; 8 9 import org.springframework.data.annotation.Id; 10 import org.springframework.data.mongodb.core.mapping.Document; 11 12 @Document(collection = "tweets") 13 public class Tweet { 14 @Id 15 private String id; 16 17 @NotBlank 18 @Size(max = 140) 19 private String text; 20 21 @NotNull 22 private Date createAt = new Date(); 23 24 public Tweet() { 25 26 } 27 28 public Tweet(String text) { 29 this.text = text; 30 } 31 32 public String getId() { 33 return id; 34 } 35 36 public void setId(String id) { 37 this.id = id; 38 } 39 40 public String getText() { 41 return text; 42 } 43 44 public void setText(String text) { 45 this.text = text; 46 } 47 48 public Date getCreateAt() { 49 return createAt; 50 } 51 52 public void setCreateAt(Date createAt) { 53 this.createAt = createAt; 54 } 55 56 }
4.2 Repository
Repository接口是DAO,继承了ReactiveMongoRepository接口用于连接MongoDB数据库做数据持久化,
1 package com.example.webfluxdemo.repository; 2 3 import org.springframework.data.mongodb.repository.ReactiveMongoRepository; 4 import org.springframework.stereotype.Repository; 5 6 import com.example.webfluxdemo.model.Tweet; 7 8 @Repository 9 public interface TweetRepository extends ReactiveMongoRepository{ 10 11 }
通过查看源码可知,父接口ReactiveMongoRepository包含对MongoDB数据库基本的增删改查方法。在运行时,Spring Boot会自动实现一个SimpleReactiveMongoRepository类,用于执行增删改查方法。这样极大地节省了程序员持久化的精力,可以专注于业务开发。
4.3 Controller
Controller是WebFlux的核心类,代码如下:
1 package com.example.webfluxdemo.controller; 2 3 import javax.validation.Valid; 4 5 import org.springframework.beans.factory.annotation.Autowired; 6 import org.springframework.http.HttpStatus; 7 import org.springframework.http.MediaType; 8 import org.springframework.http.ResponseEntity; 9 import org.springframework.web.bind.annotation.DeleteMapping; 10 import org.springframework.web.bind.annotation.GetMapping; 11 import org.springframework.web.bind.annotation.PathVariable; 12 import org.springframework.web.bind.annotation.PostMapping; 13 import org.springframework.web.bind.annotation.PutMapping; 14 import org.springframework.web.bind.annotation.RequestBody; 15 import org.springframework.web.bind.annotation.RestController; 16 17 import com.example.webfluxdemo.model.Tweet; 18 import com.example.webfluxdemo.repository.TweetRepository; 19 20 import reactor.core.publisher.Flux; 21 import reactor.core.publisher.Mono; 22 23 @RestController 24 public class TweetController { 25 26 @Autowired 27 private TweetRepository tweetRepository; 28 29 @GetMapping("/tweets") 30 public FluxgetAllTweets(){ 31 return tweetRepository.findAll(); 32 } 33 34 @PostMapping("/tweets") 35 public Mono createTweets(@Valid @RequestBody Tweet tweet){ 36 return tweetRepository.save(tweet); 37 } 38 39 @GetMapping("/tweets/{id}") 40 public Mono > getTweetById(@PathVariable(value = "id") String tweetId) { 41 return tweetRepository.findById(tweetId) 42 .map(savedTweet -> ResponseEntity.ok(savedTweet)) 43 .defaultIfEmpty(ResponseEntity.notFound().build()); 44 } 45 46 @PutMapping("/tweets/{id}") 47 public Mono > updateTweet(@PathVariable(value = "id") String tweetId, 48 @Valid @RequestBody Tweet tweet) { 49 return tweetRepository.findById(tweetId) 50 .flatMap(existingTweet -> { 51 existingTweet.setText(tweet.getText()); 52 return tweetRepository.save(existingTweet); 53 }) 54 .map(updatedTweet -> new ResponseEntity<>(updatedTweet, HttpStatus.OK)) 55 .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND)); 56 } 57 58 @DeleteMapping("/tweets/{id}") 59 public Mono > deleteTweet(@PathVariable(value = "id") String tweetId) { 60 61 return tweetRepository.findById(tweetId) 62 .flatMap(existingTweet -> 63 tweetRepository.delete(existingTweet) 64 .then(Mono.just(new ResponseEntity (HttpStatus.OK))) 65 ) 66 .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND)); 67 } 68 69 // 基于反应式流发送微博至客户端 70 @GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE) 71 public Flux streamAllTweets() { 72 return tweetRepository.findAll(); 73 } 74 75 }
Controller使用Flux或Mono作为对象,返回给不同的请求。反应式编码主要在最后一个方法:
// 基于反应式流发送微博至客户端 @GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public FluxstreamAllTweets() { return tweetRepository.findAll(); }
这个方法和getAllTweet方法一样,会返回一个JSON流到客户端,区别在于streamAllTweets以Server-send-event的方式返回一个Json流到浏览器,这种流可以被浏览器识别和使用。
使用WebTestClient测试应用
WebTestClient是Spring 5提供的一个异步反应式Http客户端,可以用于测试反应式的RestFul微服务应用。在IDE的测试文件夹中,可以找到测试类,编写代码如下:
1 package com.example.webfluxdemo; 2 3 import java.util.Collections; 4 5 import org.assertj.core.api.Assertions; 6 import org.junit.Test; 7 import org.junit.runner.RunWith; 8 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.boot.test.context.SpringBootTest; 10 import org.springframework.http.MediaType; 11 import org.springframework.test.context.junit4.SpringRunner; 12 import org.springframework.test.web.reactive.server.WebTestClient; 13 14 import com.example.webfluxdemo.model.Tweet; 15 import com.example.webfluxdemo.repository.TweetRepository; 16 17 import reactor.core.publisher.Mono; 18 19 @RunWith(SpringRunner.class) 20 @SpringBootTest(webEnvirOnment= SpringBootTest.WebEnvironment.RANDOM_PORT) 21 public class WebfluxDemoApplicationTests { 22 23 @Autowired 24 private WebTestClient webTestClient; 25 26 @Autowired 27 TweetRepository tweetRepository; 28 29 @Test 30 public void testCreateTweet() { 31 Tweet tweet = new Tweet("这是一条测试微博"); 32 33 webTestClient.post().uri("/tweets") 34 .contentType(MediaType.APPLICATION_JSON_UTF8) 35 .accept(MediaType.APPLICATION_JSON_UTF8) 36 .body(Mono.just(tweet), Tweet.class) 37 .exchange() 38 .expectStatus().isOk() 39 .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) 40 .expectBody() 41 .jsonPath("$.id").isNotEmpty() 42 .jsonPath("$.text").isEqualTo("这是一条测试微博"); 43 } 44 45 @Test 46 public void testGetAllTweets() { 47 webTestClient.get().uri("/tweets") 48 .accept(MediaType.APPLICATION_JSON_UTF8) 49 .exchange() 50 .expectStatus().isOk() 51 .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) 52 .expectBodyList(Tweet.class); 53 } 54 55 @Test 56 public void testGetSingleTweet() { 57 Tweet tweet = tweetRepository.save(new Tweet("Hello, World!")).block(); 58 59 webTestClient.get() 60 .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId())) 61 .exchange() 62 .expectStatus().isOk() 63 .expectBody() 64 .consumeWith(response -> 65 Assertions.assertThat(response.getResponseBody()).isNotNull()); 66 } 67 68 @Test 69 public void testUpdateTweet() { 70 Tweet tweet = tweetRepository.save(new Tweet("Initial Tweet")).block(); 71 72 Tweet newTweetData = new Tweet("更新微博"); 73 74 webTestClient.put() 75 .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId())) 76 .contentType(MediaType.APPLICATION_JSON_UTF8) 77 .accept(MediaType.APPLICATION_JSON_UTF8) 78 .body(Mono.just(newTweetData), Tweet.class) 79 .exchange() 80 .expectStatus().isOk() 81 .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) 82 .expectBody() 83 .jsonPath("$.text").isEqualTo("更新微博"); 84 } 85 86 @Test 87 public void testDeleteTweet() { 88 Tweet tweet = tweetRepository.save(new Tweet("将要被删除的微博")).block(); 89 90 webTestClient.delete() 91 .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId())) 92 .exchange() 93 .expectStatus().isOk(); 94 } 95 96 }
使用mvn test命令,测试所有的测试类。结果如下:
查看mLab的数据库,数据被成功添加: