使用Spring WebFlux
- *同步阻塞I/O模型: *传统的基于servlet的web框架(如Spring MVC),本质上是阻塞和多线程的,每一个请求线程对应一个工作线程去处理.在接收到请求时会从线程池中取出一个工作线程去处理,在工作线程处理完成之前,请求线程会被阻塞
- *同步非阻塞I/O模型: * 简单说就是请求线程将任务交给工作线程后,不再等待工作线程返回结果,而是去处理其他请求,设定每隔一段时间请求线程再来查询是否有结果(同步),这样可以使用更少的请求线程处理更多的请求
- *异步非阻塞I/O模型: *请求线程将任务交给工作线程后,不再等待工作线程返回结果,而是去处理其他请求,而工作线程处理完成之后通过回调等方式直接通知请求线程去处理(异步)
- Spring WebFlux是支持异步非阻塞的web框架,显然Spring WebFlux并不能使响应时间缩短,它仅仅能提升吞吐量和伸缩性
1. 引入Spring WebFlux
- Spring MVC是基于Java Servlet API,因此需要在servlet容器中运行
- Spring WebFlux与其不同, 是基于
Reactive HTTP API
,因此不需要在servelt容器中运行,它可以在任何非阻塞的web容器中运行(如 Netty丶Undertow丶Tomcat丶Jetty丶Servlet3.1+)- 使用Spring WebFlux可替换掉Spring MVC,即在spring boot 中使用
spring-boot-starter-webflux
依赖替换掉spring-boot-starter-web
- Spring WebFlux默认使用Netty来替换Tomcat
Maven依赖: 替换传统的web框架(Spring MVC)依赖
spring-boot-starter-web
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
2. 方式一: reactive controller处理请求
- 代码编写和
spring mvc
类似,只是使用了reactive type类型的参数和返回值(spring mvc也可以使用)- 不同的是使用webFlux之后,是异步非阻塞的执行
2.1 使用Flux代替model对象作为返回值
controller中创建Flux返回
@GetMapping("/recent") @ResponseBody public Flux<Taco> recentTacos() { return Flux.fromIterable(tacoRepo.findAll()).take(12); }
Repository中直接返回Flux类型: 继承ReactiveCrudRepository
@GetMapping("/recent") @ResponseBody public Flux<Taco> recentTacos() { return tacoRepo.findAll().take(12); }
public interface TacoRepository extends ReactiveCrudRepository<Taco, Long> { }
2.2 使用Mono单一对象作为返回值
@GetMapping("/{id}")
@ResponseBody
public Mono<Taco> tacoById(@PathVariable("id") Long id) {
return tacoRepo.findById(id);
}
2.3 使用RxJava类型作为返回值
//Observable: 类似于Flux
@GetMapping("/recent")
@ResponseBody
public Observable<Taco> recentTacos() {
return tacoService.getRecentTacos();
}
//Single: 类似于Mono
@GetMapping("/{id}")
@ResponseBody
public Single<Taco> tacoById(@PathVariable("id") Long id) {
return tacoService.lookupTaco(id);
}
2.4 使用reactive type作为输入参数
@PostMapping(consumes="application/json")
@ResponseStatus(HttpStatus.CREATED)
@ResponseBody
public Mono<Taco> postTaco(@RequestBody Mono<Taco> tacoMono) {
return tacoRepo.saveAll(tacoMono).next();
}
3. 方式二: 使用Spring函数式编程处理请求(functional request handlers)
Spring 提供的另一种方式(函数式编程)来处理请求:
Spring's functional programming
3.1 四大函数式接口
- *RequestPredicate: * 声明将要处理的请求的类型,可使用
RequestPredicates
工具类创建 - *RouterFunction: * 声明如何将匹配的请求路由到处理代码,可使用
RouterFunctions
工具类创建对象 - *ServerRequest: * 表示一个HTTP请求,包含头部和请求体内容
- *ServerResponse: * 表示一个HTTP响应,包含头部和响应体信息
//实例: 可使用静态导入方式减少代码书写
@Configuration
public class ReactiveFunctionalHandlerConfiguration {
@Bean
public RouterFunction<?> routerFunction(){
return RouterFunctions.route(RequestPredicates.GET("/helloWorld"),
serverRequest -> ServerResponse.ok().body(Mono.just("Hello World!"))) //处理get请求/hello返回字符串Hello World!
.andRoute(RequestPredicates.POST("/bye"),
serverRequest -> ServerResponse.ok().body(Mono.just("Bye"), new ParameterizedTypeReference<Mono<? super String>>(){}));
}
}
4. 实例
*Repository: * ReactiveJpaPopRepository.java
public interface ReactiveJpaPopRepository extends ReactiveCrudRepository<PopInStock,Integer> { }
4.1 reactive controller方式处理请求
controller
@Controller @RequestMapping("/reactive/pop") public class ReactivePopController { private ReactiveJpaPopRepository reactiveJpaPopRepository; @Autowired public ReactivePopController(ReactiveJpaPopRepository reactiveJpaPopRepository) { this.reactiveJpaPopRepository = reactiveJpaPopRepository; } @PostMapping("/{id}") @ResponseBody public Mono<PopInStock> popById(@PathVariable("id") int id){ return reactiveJpaPopRepository.findById(id); } @GetMapping("/recent") @ResponseBody public Flux<PopInStock> recent(){ return reactiveJpaPopRepository.findAll().take(12); } }
4.2 Spring 函数式编程方式处理请求
Configuration
@Configuration public class ReactiveFunctionalHandlerConfiguration { @Bean public RouterFunction<?> routerFunction(){ return RouterFunctions.route(RequestPredicates.GET("/helloWorld"), serverRequest -> ServerResponse.ok().body(Mono.just("Hello World!"))) .andRoute(RequestPredicates.POST("/bye"), serverRequest -> ServerResponse.ok().body(Mono.just("Bye"), new ParameterizedTypeReference<Mono<? super String>>(){})); } @Bean public RouterFunction<?> popRouterFunction(){ return RouterFunctions.route(RequestPredicates.GET("/reactive/pop/{id}"),this::popById) .andRoute(RequestPredicates.POST("/reactive/pop/recent"),this::recent); } @Autowired private ReactiveJpaPopRepository reactiveJpaPopRepository; public ServerResponse popById(ServerRequest serverRequest) throws Exception { int id = Integer.parseInt(serverRequest.pathVariable("id")); return ServerResponse.ok().body(ServerResponse.ok().body(reactiveJpaPopRepository.findById(id))); } public ServerResponse recent(ServerRequest serverRequest) throws Exception { return ServerResponse.ok().body(reactiveJpaPopRepository.findAll().take(12)); } }
5. 测试
5.1 方式一(无服务器模式): GET/POST请求测试
@Test
public void testRecent() throws JsonProcessingException {
PopInStock[] pops = new PopInStock[12];
for(int i=0;i<pops.length;++i){
pops[i] = getPop(i+1);
}
ReactiveJpaPopRepository reactiveJpaPopRepository = Mockito.mock(ReactiveJpaPopRepository.class);
//当执行findAll方法时返回封装pops的flux对象
Mockito.when(reactiveJpaPopRepository.findAll()).thenReturn(Flux.fromArray(pops));
WebTestClient testClient = WebTestClient.bindToController(new ReactivePopController(reactiveJpaPopRepository))
.build();
//GET请求测试:
testClient.get().uri("/reactive/pop/recent")
.accept(MediaType.APPLICATION_JSON) //接收的类型
.exchange()//发送请求
.expectStatus().isOk()
.expectBody()//获取到响应体内容
.jsonPath("$").isNotEmpty()
.jsonPath("$").isArray()
.jsonPath("$[0].id").isEqualTo(pops[0].getId()) //验证单个属性
.json(new ObjectMapper().writeValueAsString(pops)); //直接对比json字符串是否相同
//POST请求测试
int id = 1;
PopInStock popInStock = getPop(id);
Mockito.when(reactiveJpaPopRepository.findById(id)).thenReturn(Mono.just(popInStock));
testClient.post().uri("/reactive/pop/"+id)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().isOk()
.expectBodyList(PopInStock.class)
.contains(popInStock);
}
private PopInStock getPop(int id){
PopInStock pop = new PopInStock();
pop.setId(id);
pop.setName("PopInStock#" + id);
return pop;
}
5.2 方式二(有服务器): 请求测试
需要使用支持reactive特性的数据库操作,由于H2不支持reactive特性报错:
Reactive Repositories are not supported by JPA
,因此改写还使用原始的jpa操作方便进行测试
Controller
@Controller @RequestMapping("/reactive/pop") public class ReactiveController { @Autowired private JpaPopRepository jpaPopRepository; @GetMapping("/recent") @ResponseBody public Flux<PopInStock> recent(){ return Flux.fromIterable(jpaPopRepository.findAll(PageRequest.of(0,12))); } }
test
@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT) public class ReactiveTest { @Autowired private WebTestClient testClient; @Autowired private JpaPopRepository jpaPopRepository; @Test public void testRecent() throws JsonProcessingException { List<PopInStock> pops = jpaPopRepository.findAll(PageRequest.of(0,12)); String json = new ObjectMapper().writeValueAsString(pops); testClient.get().uri("/reactive/pop/recent") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectBody() //.expectBodyList(PopInStock.class) //.isEqualTo(pops); .jsonPath("$[?(@.id == 1)].code").isEqualTo("R0B0506432") .json(json); } }
6. reactive(响应式): 使用REST APIS
RestTemplate
可以发送请求并对响应结果进行封装,但是都是封装为普通的类型,不能封装为reactive typeWebClient:
对于想使用reactive特性操作响应结果的情况,可以使用spring 5 提供的WebClient
替代WebTestClient
是用于请求结果进行验证,WebClient
作用是对请求结果进行封装为reactive type
6.1 使用WebTestClient基本步骤
创建WebClient实例(或者注入bean)
指定HTTP请求类型
指定URI和headers属性
提交请求
消费响应的结果
6.2 GET请求
直接创建WebClient实例并使用
@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.DEFINED_PORT) public class ReactiveLiveServerTest { @Test public void testReactiveGetById(){ Mono<PopInStock> popMono = WebClient.create() .get() .uri("http://localhost:9090/api/popInStocks/{id}",115) .retrieve() .bodyToMono(PopInStock.class); popMono.subscribe(System.out::println);//消费 } }
根据基础URI创建WebClient再使用
//创建WebClient的bean,只在测试环境下使用 @TestConfiguration public class ReactiveTestConfiguration { @Bean public WebClient webClient(){ return WebClient.create("http://localhost:9090"); } }
//使用创建的WebClient bean对象 @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.DEFINED_PORT) @Import(ReactiveTestConfiguration.class) //引入测试使用的bean配置类 public class ReactiveLiveServerTest { @Autowired private WebClient webClient; @Test public void testReactiveGetById(){ //创建基本路径的WebClient Flux<PopInStock> popFlux = webClient.get() .uri("/reactive/live/recent") .retrieve() .bodyToFlux(PopInStock.class); popFlux.timeout(Duration.ofSeconds(1)) //设置超时等待时间 .subscribe( System.out::println ,error -> { //处理超时异常 System.out.println("超时!"); } ); } }
6.3 POST/PUT/DELETE请求
@TestConfiguration
public class ReactiveTestConfiguration {
@Bean
public WebClient webClient(){
return WebClient.create("http://localhost:9090");
}
}
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
@Import(ReactiveTestConfiguration.class)
public class ReactiveWebClientTest {
@Autowired
private WebClient webClient;
@Test
public void sendTest(){
PopInStock popInStock = new PopInStock();
popInStock.setId(120);
popInStock.setName("AAAAAAA");
Mono<PopInStock> popMono = Mono.just(popInStock);
//1. 使用Mono对象创建
webClient.post()
.uri("/api/popInStocks")
.body(popMono, PopInStock.class)
.retrieve()
.toBodilessEntity()
.subscribe(e -> System.out.println(e.getStatusCode()));
//2. 使用原始对象创建
webClient.post()
.uri("/api/popInStocks")
.bodyValue(popInStock)
.retrieve()
.toBodilessEntity()
.subscribe(e -> System.out.println(e.getStatusCode()));
//3. 使用PUT请求
webClient.put()
.uri("/api/popInStocks/{id}",1)
.body(popMono, PopInStock.class)
.retrieve()
.bodyToMono(Void.class)
.subscribe();
//4. 使用DELETE请求
webClient.delete()
.uri("/api/popInStocks/{id}",1)
.retrieve()
.bodyToMono(Void.class)
.subscribe();
}
}
6.4 处理异常
如果只在消费
subscribe
方法中处理异常,那么会统一返回WebClientResponseException
无法知道更详细的错误信息,因此更好的方式是在WebClient
发送请求时使用onStatus
方法自定义详细的异常,然后在消费时subscribe
方法中就能返回自定义的异常
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.DEFINED_PORT)
@Import(ReactiveTestConfiguration.class)
public class ReactiveLiveServerTest {
@Autowired
private WebClient webClient;
@Test
public void testReactiveError(){
Mono<PopInStock> popMono = webClient.get()
.uri("/api/popInStocks/{id}",120)
.retrieve()
.onStatus(HttpStatus::is5xxServerError, //或status -> status == HttpStatus.NOT_FOUND
response -> Mono.just(new Exception("错误!")))
.bodyToMono(PopInStock.class);
//消费
popMono.subscribe(
System.out::println, //e -> {}
Throwable::printStackTrace //error -> {}
);
}
}
6.5 使用WebClient的exchange方法
retrieve:
方法返回的是ResponseSpec
对象,该对象提供了处理异常和响应体内容封装等简单的功能,可以满足一些常见的情况,但是如果要使用详细的返回信息,如响应头headers等则需要使用exchange
方法exchange:
方法返回的是Mono<ClientResponse>
包含了完整的响应信息,该对象提供了可以处理响应头headers,cookies的方法
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.DEFINED_PORT)
@Import(ReactiveTestConfiguration.class)
public class ReactiveLiveServerTest {
@Autowired
private WebClient webClient;
@Test
public void testReactiveExchange(){
//1. 等价于retrieve方法使用
Mono<PopInStock> popMono = webClient.get()
.uri("/api/popInStocks/{id}",1)
.exchange()
.flatMap(e -> e.bodyToMono(PopInStock.class));
popMono.subscribe(System.out::println);
//2. 不同与retrieve使用,可以处理响应头headers,cookies等
Mono<PopInStock> popInStockMono = webClient.get()
.uri("/api/popInStocks/{id}",2)
.exchange()
.flatMap(e -> {
if(e.headers().header("X_UNAVAILABLE").contains("true")) //如果响应头存在此内容则返回空Mono对象
return Mono.empty();
return Mono.just(e);//否则返回Mono<ClientResponse>
})
.flatMap(e -> e.bodyToMono(PopInStock.class));
//消费
popInStockMono.subscribe(System.out::println);
}
}
7. reactive(响应式): 使用Security
- 对于servlet-based Spring MVC的Security可以使用servlet filter-based Spring Security
- 由于Spring WebFlux不基于servelt所以不能使用servlet filter-based Spring Security方式
- 从Spring 5开始Spring Security使用Spring’s WebFilter进行安全验证,因此对于Spring MVC和Spring WebFlux均可以配置权限
- 不同之处就是配置Configuration的不同
7.1 权限配置
@Configuration
@EnableWebFluxSecurity
public class SecurityConfiguration{
@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http){
return http.authorizeExchange()
//.pathMatchers("/home").hasAnyAuthority("USER_ROLE")
//.anyExchange()
//.permitAll()
.pathMatchers("/reactive/**")
.permitAll()
.and()
.build();
}
}
7.2 配置用户认证服务
@Service
public ReactiveUserDetailsService userDetailsService(UserRepository userRepo) {
return new ReactiveUserDetailsService() {
@Override
public Mono<UserDetails> findByUsername(String username) {
return userRepo.findByUsername(username)
.map(user -> {return user.toUserDetails();});
}
};
}