Spring-Reactive Spring:开发reactive APIS


使用Spring WebFlux

  • *同步阻塞I/O模型: *传统的基于servlet的web框架(如Spring MVC),本质上是阻塞和多线程的,每一个请求线程对应一个工作线程去处理.在接收到请求时会从线程池中取出一个工作线程去处理,在工作线程处理完成之前,请求线程会被阻塞
  • *同步非阻塞I/O模型: * 简单说就是请求线程将任务交给工作线程后,不再等待工作线程返回结果,而是去处理其他请求,设定每隔一段时间请求线程再来查询是否有结果(同步),这样可以使用更少的请求线程处理更多的请求
  • *异步非阻塞I/O模型: *请求线程将任务交给工作线程后,不再等待工作线程返回结果,而是去处理其他请求,而工作线程处理完成之后通过回调等方式直接通知请求线程去处理(异步)
  • Spring WebFlux是支持异步非阻塞的web框架,显然Spring WebFlux并不能使响应时间缩短,它仅仅能提升吞吐量和伸缩性

1. 引入Spring WebFlux

  • Spring MVC是基于Java Servlet API,因此需要在servlet容器中运行
  • Spring WebFlux与其不同, 是基于Reactive HTTP API ,因此不需要在servelt容器中运行,它可以在任何非阻塞的web容器中运行(如 Netty丶Undertow丶Tomcat丶Jetty丶Servlet3.1+)
  • 使用Spring WebFlux可替换掉Spring MVC,即在spring boot 中使用spring-boot-starter-webflux依赖替换掉spring-boot-starter-web
  • Spring WebFlux默认使用Netty来替换Tomcat
  • Maven依赖: 替换传统的web框架(Spring MVC)依赖spring-boot-starter-web

    <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

2. 方式一: reactive controller处理请求

  • 代码编写和spring mvc类似,只是使用了reactive type类型的参数和返回值(spring mvc也可以使用)
  • 不同的是使用webFlux之后,是异步非阻塞的执行
2.1 使用Flux代替model对象作为返回值
  • controller中创建Flux返回

    @GetMapping("/recent")
    @ResponseBody
    public Flux<Taco> recentTacos() {
     return Flux.fromIterable(tacoRepo.findAll()).take(12);
    }
  • Repository中直接返回Flux类型: 继承ReactiveCrudRepository

    @GetMapping("/recent")
    @ResponseBody
    public Flux<Taco> recentTacos() {
     return tacoRepo.findAll().take(12);
    }
    public interface TacoRepository
     extends ReactiveCrudRepository<Taco, Long> {
    }
2.2 使用Mono单一对象作为返回值
@GetMapping("/{id}")
@ResponseBody
public Mono<Taco> tacoById(@PathVariable("id") Long id) {
 return tacoRepo.findById(id);
}
2.3 使用RxJava类型作为返回值
//Observable: 类似于Flux
@GetMapping("/recent")
@ResponseBody
public Observable<Taco> recentTacos() {
 return tacoService.getRecentTacos();
}
//Single: 类似于Mono
@GetMapping("/{id}")
@ResponseBody
public Single<Taco> tacoById(@PathVariable("id") Long id) {
 return tacoService.lookupTaco(id);
}
2.4 使用reactive type作为输入参数
@PostMapping(consumes="application/json")
@ResponseStatus(HttpStatus.CREATED)
@ResponseBody
public Mono<Taco> postTaco(@RequestBody Mono<Taco> tacoMono) {
 return tacoRepo.saveAll(tacoMono).next();
}

3. 方式二: 使用Spring函数式编程处理请求(functional request handlers)

Spring 提供的另一种方式(函数式编程)来处理请求: Spring's functional programming

3.1 四大函数式接口
  • *RequestPredicate: * 声明将要处理的请求的类型,可使用RequestPredicates工具类创建
  • *RouterFunction: * 声明如何将匹配的请求路由到处理代码,可使用RouterFunctions工具类创建对象
  • *ServerRequest: * 表示一个HTTP请求,包含头部和请求体内容
  • *ServerResponse: * 表示一个HTTP响应,包含头部和响应体信息
//实例: 可使用静态导入方式减少代码书写
@Configuration
public class ReactiveFunctionalHandlerConfiguration {
    @Bean
    public RouterFunction<?> routerFunction(){
        return RouterFunctions.route(RequestPredicates.GET("/helloWorld"),
                        serverRequest -> ServerResponse.ok().body(Mono.just("Hello World!"))) //处理get请求/hello返回字符串Hello World!
                .andRoute(RequestPredicates.POST("/bye"),
                        serverRequest -> ServerResponse.ok().body(Mono.just("Bye"), new ParameterizedTypeReference<Mono<? super String>>(){}));
    }
}

4. 实例

  • *Repository: * ReactiveJpaPopRepository.java

    public interface ReactiveJpaPopRepository extends ReactiveCrudRepository<PopInStock,Integer> {
    }
4.1 reactive controller方式处理请求
  • controller

    @Controller
    @RequestMapping("/reactive/pop")
    public class ReactivePopController {
        private ReactiveJpaPopRepository reactiveJpaPopRepository;
        @Autowired
        public ReactivePopController(ReactiveJpaPopRepository reactiveJpaPopRepository) {
            this.reactiveJpaPopRepository = reactiveJpaPopRepository;
        }
        @PostMapping("/{id}")
        @ResponseBody
        public Mono<PopInStock> popById(@PathVariable("id") int id){
            return reactiveJpaPopRepository.findById(id);
        }
        @GetMapping("/recent")
        @ResponseBody
        public Flux<PopInStock> recent(){
            return reactiveJpaPopRepository.findAll().take(12);
        }
    }
4.2 Spring 函数式编程方式处理请求
  • Configuration

    @Configuration
    public class ReactiveFunctionalHandlerConfiguration {
        @Bean
        public RouterFunction<?> routerFunction(){
            return RouterFunctions.route(RequestPredicates.GET("/helloWorld"),
                            serverRequest -> ServerResponse.ok().body(Mono.just("Hello World!")))
                    .andRoute(RequestPredicates.POST("/bye"),
                            serverRequest -> ServerResponse.ok().body(Mono.just("Bye"), new ParameterizedTypeReference<Mono<? super String>>(){}));
        }
        @Bean
        public RouterFunction<?> popRouterFunction(){
            return RouterFunctions.route(RequestPredicates.GET("/reactive/pop/{id}"),this::popById)
                    .andRoute(RequestPredicates.POST("/reactive/pop/recent"),this::recent);
        }
        @Autowired
        private ReactiveJpaPopRepository reactiveJpaPopRepository;
        public ServerResponse popById(ServerRequest serverRequest) throws Exception {
            int id = Integer.parseInt(serverRequest.pathVariable("id"));
            return ServerResponse.ok().body(ServerResponse.ok().body(reactiveJpaPopRepository.findById(id)));
        }
        public ServerResponse recent(ServerRequest serverRequest) throws Exception {
            return ServerResponse.ok().body(reactiveJpaPopRepository.findAll().take(12));
        }
    }

5. 测试

5.1 方式一(无服务器模式): GET/POST请求测试
    @Test
    public void testRecent() throws JsonProcessingException {
        PopInStock[] pops = new PopInStock[12];
        for(int i=0;i<pops.length;++i){
            pops[i] = getPop(i+1);
        }
        ReactiveJpaPopRepository reactiveJpaPopRepository = Mockito.mock(ReactiveJpaPopRepository.class);
        //当执行findAll方法时返回封装pops的flux对象
        Mockito.when(reactiveJpaPopRepository.findAll()).thenReturn(Flux.fromArray(pops));
        WebTestClient testClient = WebTestClient.bindToController(new ReactivePopController(reactiveJpaPopRepository))
                .build();
        //GET请求测试:
        testClient.get().uri("/reactive/pop/recent")
                .accept(MediaType.APPLICATION_JSON) //接收的类型
                .exchange()//发送请求
                .expectStatus().isOk()
                .expectBody()//获取到响应体内容
                .jsonPath("$").isNotEmpty()
                .jsonPath("$").isArray()
                .jsonPath("$[0].id").isEqualTo(pops[0].getId()) //验证单个属性
                .json(new ObjectMapper().writeValueAsString(pops)); //直接对比json字符串是否相同
        //POST请求测试
        int id = 1;
        PopInStock popInStock = getPop(id);
        Mockito.when(reactiveJpaPopRepository.findById(id)).thenReturn(Mono.just(popInStock));
        testClient.post().uri("/reactive/pop/"+id)
                .accept(MediaType.APPLICATION_JSON)
                .exchange()
                .expectStatus().isOk()
                .expectBodyList(PopInStock.class)
                .contains(popInStock);
    }
    private PopInStock getPop(int id){
        PopInStock pop = new PopInStock();
        pop.setId(id);
        pop.setName("PopInStock#" + id);
        return pop;
    }
5.2 方式二(有服务器): 请求测试

需要使用支持reactive特性的数据库操作,由于H2不支持reactive特性报错: Reactive Repositories are not supported by JPA,因此改写还使用原始的jpa操作方便进行测试

  • Controller

    @Controller
    @RequestMapping("/reactive/pop")
    public class ReactiveController {
        @Autowired
        private JpaPopRepository jpaPopRepository;
        @GetMapping("/recent")
        @ResponseBody
        public Flux<PopInStock> recent(){
            return Flux.fromIterable(jpaPopRepository.findAll(PageRequest.of(0,12)));
        }
    }
  • test

    @RunWith(SpringRunner.class)
    @SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT)
    public class ReactiveTest {
        @Autowired
        private WebTestClient testClient;
        @Autowired
        private JpaPopRepository jpaPopRepository;
        @Test
        public void testRecent() throws JsonProcessingException {
            List<PopInStock> pops = jpaPopRepository.findAll(PageRequest.of(0,12));
            String json = new ObjectMapper().writeValueAsString(pops);
            testClient.get().uri("/reactive/pop/recent")
                    .accept(MediaType.APPLICATION_JSON)
                    .exchange()
                    .expectStatus().isOk()
                    .expectBody()
                    //.expectBodyList(PopInStock.class)
                    //.isEqualTo(pops);
                    .jsonPath("$[?(@.id == 1)].code").isEqualTo("R0B0506432")
                    .json(json);
        }
    }

6. reactive(响应式): 使用REST APIS

  • RestTemplate可以发送请求并对响应结果进行封装,但是都是封装为普通的类型,不能封装为reactive type
  • WebClient: 对于想使用reactive特性操作响应结果的情况,可以使用spring 5 提供的WebClient替代
  • WebTestClient是用于请求结果进行验证,WebClient作用是对请求结果进行封装为reactive type
6.1 使用WebTestClient基本步骤
  1. 创建WebClient实例(或者注入bean)

  2. 指定HTTP请求类型

  3. 指定URI和headers属性

  4. 提交请求

  5. 消费响应的结果

6.2 GET请求
  • 直接创建WebClient实例并使用

    @RunWith(SpringRunner.class)
    @SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.DEFINED_PORT)
    public class ReactiveLiveServerTest {
        @Test
        public void testReactiveGetById(){
            Mono<PopInStock> popMono = WebClient.create()
                    .get()
                    .uri("http://localhost:9090/api/popInStocks/{id}",115)
                    .retrieve()
                    .bodyToMono(PopInStock.class);
            popMono.subscribe(System.out::println);//消费
        }
    }
  • 根据基础URI创建WebClient再使用

    //创建WebClient的bean,只在测试环境下使用
    @TestConfiguration
    public class ReactiveTestConfiguration {
        @Bean
        public WebClient webClient(){
            return WebClient.create("http://localhost:9090");
        }
    }
    //使用创建的WebClient bean对象
    @RunWith(SpringRunner.class)
    @SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.DEFINED_PORT)
    @Import(ReactiveTestConfiguration.class) //引入测试使用的bean配置类
    public class ReactiveLiveServerTest {
        @Autowired
        private WebClient webClient;
        @Test
        public void testReactiveGetById(){
            //创建基本路径的WebClient
            Flux<PopInStock> popFlux =
                    webClient.get()
                    .uri("/reactive/live/recent")
                    .retrieve()
                    .bodyToFlux(PopInStock.class);
            popFlux.timeout(Duration.ofSeconds(1)) //设置超时等待时间
                    .subscribe(
                            System.out::println
                            ,error -> { //处理超时异常
                               System.out.println("超时!");
                            }
                    );
        }
    }
6.3 POST/PUT/DELETE请求
@TestConfiguration
public class ReactiveTestConfiguration {
    @Bean
    public WebClient webClient(){
        return WebClient.create("http://localhost:9090");
    }
}
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
@Import(ReactiveTestConfiguration.class)
public class ReactiveWebClientTest {
    @Autowired
    private WebClient webClient;
    @Test
    public void sendTest(){
        PopInStock popInStock = new PopInStock();
        popInStock.setId(120);
        popInStock.setName("AAAAAAA");
        Mono<PopInStock> popMono = Mono.just(popInStock);

        //1. 使用Mono对象创建
        webClient.post()
                .uri("/api/popInStocks")
                .body(popMono, PopInStock.class)
                .retrieve()
                .toBodilessEntity()
                .subscribe(e -> System.out.println(e.getStatusCode()));

        //2. 使用原始对象创建
        webClient.post()
                .uri("/api/popInStocks")
                .bodyValue(popInStock)
                .retrieve()
                .toBodilessEntity()
                .subscribe(e -> System.out.println(e.getStatusCode()));

        //3. 使用PUT请求
        webClient.put()
                .uri("/api/popInStocks/{id}",1)
                .body(popMono, PopInStock.class)
                .retrieve()
                .bodyToMono(Void.class)
                .subscribe();

        //4. 使用DELETE请求
        webClient.delete()
                .uri("/api/popInStocks/{id}",1)
                .retrieve()
                .bodyToMono(Void.class)
                .subscribe();
    }
}
6.4 处理异常

如果只在消费subscribe方法中处理异常,那么会统一返回WebClientResponseException无法知道更详细的错误信息,因此更好的方式是在WebClient发送请求时使用onStatus方法自定义详细的异常,然后在消费时subscribe方法中就能返回自定义的异常

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.DEFINED_PORT)
@Import(ReactiveTestConfiguration.class)
public class ReactiveLiveServerTest {
    @Autowired
    private WebClient webClient;
    @Test
    public void testReactiveError(){
        Mono<PopInStock> popMono = webClient.get()
                .uri("/api/popInStocks/{id}",120)
                .retrieve()
                .onStatus(HttpStatus::is5xxServerError, //或status -> status == HttpStatus.NOT_FOUND
                        response -> Mono.just(new Exception("错误!")))
                .bodyToMono(PopInStock.class);

        //消费
        popMono.subscribe(
                System.out::println, //e -> {}
                Throwable::printStackTrace //error -> {}
        );
    }
}
6.5 使用WebClient的exchange方法
  • retrieve:方法返回的是ResponseSpec对象,该对象提供了处理异常和响应体内容封装等简单的功能,可以满足一些常见的情况,但是如果要使用详细的返回信息,如响应头headers等则需要使用exchange方法
  • exchange: 方法返回的是Mono<ClientResponse>包含了完整的响应信息,该对象提供了可以处理响应头headers,cookies的方法
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.DEFINED_PORT)
@Import(ReactiveTestConfiguration.class)
public class ReactiveLiveServerTest {
    @Autowired
    private WebClient webClient;
    @Test
    public void testReactiveExchange(){
        //1. 等价于retrieve方法使用
        Mono<PopInStock> popMono = webClient.get()
                .uri("/api/popInStocks/{id}",1)
                .exchange()
                .flatMap(e -> e.bodyToMono(PopInStock.class));
        popMono.subscribe(System.out::println);

        //2. 不同与retrieve使用,可以处理响应头headers,cookies等
        Mono<PopInStock> popInStockMono = webClient.get()
                .uri("/api/popInStocks/{id}",2)
                .exchange()
                .flatMap(e -> {
                    if(e.headers().header("X_UNAVAILABLE").contains("true")) //如果响应头存在此内容则返回空Mono对象
                        return Mono.empty();
                    return Mono.just(e);//否则返回Mono<ClientResponse>
                })
                .flatMap(e -> e.bodyToMono(PopInStock.class));

        //消费
        popInStockMono.subscribe(System.out::println);
    }
}

7. reactive(响应式): 使用Security

  • 对于servlet-based Spring MVC的Security可以使用servlet filter-based Spring Security
  • 由于Spring WebFlux不基于servelt所以不能使用servlet filter-based Spring Security方式
  • 从Spring 5开始Spring Security使用Spring’s WebFilter进行安全验证,因此对于Spring MVC和Spring WebFlux均可以配置权限
  • 不同之处就是配置Configuration的不同
7.1 权限配置
@Configuration
@EnableWebFluxSecurity
public class SecurityConfiguration{
    @Bean
    public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http){
        return http.authorizeExchange()
                //.pathMatchers("/home").hasAnyAuthority("USER_ROLE")
                //.anyExchange()
                //.permitAll()
                .pathMatchers("/reactive/**")
                .permitAll()
                .and()
                .build();
    }
}
7.2 配置用户认证服务
@Service
public ReactiveUserDetailsService userDetailsService(UserRepository userRepo) {
    return new ReactiveUserDetailsService() {
         @Override
         public Mono<UserDetails> findByUsername(String username) {
         return userRepo.findByUsername(username)
             .map(user -> {return user.toUserDetails();});
         }
     };
}

文章作者: Bryson
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Bryson !
评论
 上一篇
下一篇 
  目录