Spring-Reactive Spring:Reactor介绍


Reactor介绍

1. 理解reactive programming

reactive programming(反应式编程):

  • 是一种基于data stream(数据流)和propagation of change(变化传递)的declarative(声明式)的编程范式
  • 是一种非阻塞事件驱动数据流的开发方案,使用函数式编程的概念来操作数据流,系统中某部分的数据变动后会自动更新其他部分,而且成本极低
  • 是观察者模式(推模式)的一种延伸,不同于传统的命令编程方式(Imperative programming)同步拉取数据的方式(如迭代器模式),而是采用数据发布者同步或者异步地进行推送
  • 实现上,屏蔽了并发实现的复杂细节,提供数据流的有序操作,实现更加简单,代码可读性高

2. Reactive Streams

Reactive Streams可以归纳为四种接口定义

  • *Publisher(发布者): * 发布数据到Subscriber
  • *Subscriber(订阅者): * 消费数据
  • Subscription(订阅): ** 定义了订阅的规则(一次订阅多少数据),会根据订阅的规则推送数据,这也是backpressure**(反压)产生的原因,可以有效阻止发布者推送超出订阅者处理能力的数据
  • *Processor(处理器): * 继承了SubscriberProcessor ,因此可以作为订阅者接收并处理数据,并且能作为发布者将处理后的数据推送给订阅者
//发布者
public interface Publisher<T> {
 void subscribe(Subscriber<? super T> subscriber); //指定数据将推送给哪些订阅者
}
//订阅者: 可以接受来自发布者的事件
public interface Subscriber<T> {
 void onSubscribe(Subscription sub); //指定订阅规则
 void onNext(T item); //执行后将订阅数据传递给订阅者
 void onError(Throwable ex);//产生任何错误时调用
 void onComplete();//发布完成: 发布者没有数据并且将不再产生数据
}
//订阅
public interface Subscription {
 void request(long n); //请求订阅:参数为订阅规则,一次可以订阅多少数据,执行后发布者发布数据进入流中
 void cancel(); //表示订阅者不在需要订阅数据
}
//处理器
public interface Processor<T, R> 
    extends Subscriber<T>, Publisher<R> {}

3. 添加Reactor依赖

  • reactor-core

    <dependency>
     <groupId>io.projectreactor</groupId>
     <artifactId>reactor-core</artifactId>
    </dependency>
  • reactor-test 如果需要测试支持则引入此依赖

    <dependency>
     <groupId>io.projectreactor</groupId>
     <artifactId>reactor-test</artifactId>
     <scope>test</scope>
    </dependency>
  • 如果不是使用的Spring Boot项目,则需要额外引入Reactor's BOM

    <dependencyManagement>
     <dependencies>
     <dependency>
     <groupId>io.projectreactor</groupId>
     <artifactId>reactor-bom</artifactId>
     <version>Bismuth-RELEASE</version>
     <type>pom</type>
     <scope>import</scope>
     </dependency>
     </dependencies>
    </dependencyManagement>

4. 使用常见的reactive operations(反应式操作)

Reactor提供了两个最基本的构件Flux(多数据)和Mono(单数据)

两者所包含的操作比较相似,一共可以分为四个大类

  • creation: 创建操作
  • combination : 组合操作
  • transformation: 转换操作
  • logic : 逻辑操作
4.1 创建
  • 通过对象创建
        @Test
        public void createFromObject(){
            Flux<String> stringFlux = Flux.just("Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday");
            stringFlux.subscribe(System.out::println);
            //推荐验证方式: 验证Flux数据是否符合预期
            StepVerifier.create(stringFlux)
                    .expectNext("Monday")
                    .expectNext("Tuesday")
                    .expectNext("Wednesday")
                    .expectNext("Thursday")
                    .expectNext("Friday")
                    .expectNext("Saturday")
                    .expectNext("Sunday")
                    .verifyComplete();
        }
  • 通过集合创建
    • array

          @Test
          public void createFromArray(){
              String[] weeks = new String[]{"Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"};
              Flux<String> stringFlux = Flux.fromArray(weeks); //创建
      
              //推荐验证方式: 验证Flux数据是否符合预期
              StepVerifier.create(stringFlux)
                      .expectNext("Monday")
                      .expectNext("Tuesday")
                      .expectNext("Wednesday")
                      .expectNext("Thursday")
                      .expectNext("Friday")
                      .expectNext("Saturday")
                      .expectNext("Sunday")
                      .verifyComplete();
          }
    • iterable

          //实现Iterable接口的类即可(如Collection)
          @Test
          public void createFromIterable(){
              List<String> weeksList = List.of("Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday");
              Flux<String> stringFlux = Flux.fromIterable(weeksList); //创建
          }
    • stream

          @Test
          public void createFromStream(){
              Stream<String> weeksStream = Stream.of("Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday");
              Flux<String> stringFlux = Flux.fromStream(weeksStream); //创建
          }
  • 创建并自动生成数据
    • 使用整数序列填充Flux

          //创建时复制整数序列[1,10]
          @Test
          public void createFluxOfRange(){
              Flux<Integer> integerFlux = Flux.range(1,10);//创建并赋值从1开始(包含1)递增生成10个数字[1,10]
              integerFlux.subscribe(System.out::println);
          }
    • 使用定时创建数据填充Flux

          @Test
          public void createFluxOfInterval(){
              //从0开始每秒生成一个long值,生成5个结束: 0L,1L,2L,3L,4L
              Flux<Long> integerFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
              integerFlux.subscribe(System.out::println);
          }
4.2 组合

可以将一个Flux(Mono)分为多个,或者将多个合并为一个

  • 归并操作
        @Test
        public void mergeFluxes(){
            Flux<Integer> flux1 = Flux.just(1,3,5,7,9).delayElements(Duration.ofMillis(500));
            Flux<Integer> flux2 = Flux.just(2,4,6,8,10)
                    .delaySubscription(Duration.ofMillis(250)) //延迟250ms准备开始订阅
                    .delayElements(Duration.ofMillis(200)); //开始订阅后,每次延迟200ms执行订阅操作Subscriber.onNext(T)
            Flux<Integer> mergeFlux = flux1.mergeWith(flux2); //归并操作: 按照订阅顺序归并
        }
  • 压缩操作(等待所有源都发布一个元素,然后将这些元素合并为一个元素放入新的Flux(Mono)中)
        @Test
        public void zipFluxes(){
            Flux<Integer> flux1 = Flux.just(1,3,5,7,9);
            Flux<Integer> flux2 = Flux.just(2,4,6,8,10);
            Flux<String> zipFlux = Flux.zip(flux1,flux2,(x,y) -> x + "#" + y);
            zipFlux.subscribe(System.out::println); //输出: 1#2 3#4 ... 9#10
        }
  • first(): (所有源发布一个元素,选取其中第一个放入新的Flux(Mono)中)
        @Test
        public void firstFluxes(){
            Flux<Integer> flux1 = Flux.just(1,3,5,7,9).delaySubscription(Duration.ofMillis(100));
            Flux<Integer> flux2 = Flux.just(2,4,6,8,10);
            Flux<Integer> zipFlux = Flux.first(flux1,flux2);
            zipFlux.subscribe(System.out::println); //输出: 2 4 6 8 10
        }
4.3 转换和过滤操作
  • skip: 可以跳过前n个数据,或者跳过前n时间段内的数据
        @Test
        public void skipFluxes(){
            //1秒发送一个
            Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7,8,9,10).delayElements(Duration.ofSeconds(1));
    
            Flux<Integer> skipFlux = integerFlux.skip(5);//跳过前5个数据
            StepVerifier.create(skipFlux)
                    .expectNext(6,7,8,9,10)
                    .verifyComplete();
    
            Flux<Integer> skipDurationFlux = integerFlux.skip(Duration.ofSeconds(6)); //跳过前6秒的数据
            StepVerifier.create(skipDurationFlux)
                    .expectNext(6,7,8,9,10)
                    .verifyComplete();
        }
  • take: 与skip相反,只获取前几项,或者前n时间段内的数据
        @Test
        public void takeFluxes(){
            //1秒发送一个
            Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7,8,9,10).delayElements(Duration.ofSeconds(1));
    
            Flux<Integer> takeFlux = integerFlux.take(5);//获取前5个数据
            StepVerifier.create(takeFlux)
                    .expectNext(1,2,3,4,5)
                    .verifyComplete();
    
            Flux<Integer> takeDurationFlux = integerFlux.take(Duration.ofSeconds(6)); //获取前6秒的数据
            StepVerifier.create(takeDurationFlux)
                    .expectNext(1,2,3,4,5)
                    .verifyComplete();
        }
  • filter: 过滤
        @Test
        public void filterFluxes(){
            Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7,8,9,10);
            Flux<Integer> filterFlux = integerFlux.filter(e -> (e & 1) == 1); //奇数
    
            StepVerifier.create(filterFlux)
                    .expectNext(1,3,5,7,9)
                    .verifyComplete();
        }
  • distinct: 去重复

        @Test
        public void distinctFluxes(){
            Flux<Integer> integerFlux = Flux.just(1,1,2,3,4,5,5,5,6,6,6).distinct(); //去重复
    
            StepVerifier.create(integerFlux)
                    .expectNext(1,2,3,4,5,6)
                    .verifyComplete();
        }
  • 映射
    • map: 一对一映射: 将一个对象转换为另一个对象
    • flatMap: 一对多映射: 将一个对象转换为多个对象放入Publisher中,最终将所有Publisher中的元素合并为一个Publisher
    • flatMap可以和subscribeOn搭配实现并发处理映射任务
        @Test
        public void mapFluxes(){
            Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7);
            String[] weeks = new String[]{"Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"};
            Flux<String> weeksFlux = integerFlux.map(e -> weeks[e-1]);
    
            StepVerifier.create(weeksFlux)
                    .expectNext(weeks)
                    .verifyComplete();
            //flatMap
            Flux<String> intWeeksFlux = integerFlux.flatMap(e -> Flux.just(e.toString(),weeks[e-1]));
            StepVerifier.create(intWeeksFlux)
                    .expectNext("1","Monday","2","Tuesday","3","Wednesday","4","Thursday","5","Friday","6","Saturday","7","Sunday")
                    .verifyComplete();
        }
  • subscribeOn: 定义执行订阅的模式Schedulers method(如并行等)
    • 实例

          @Test
          public void subscribeOnFluxes(){
              Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7);
              String[] weeks = new String[]{"Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"};
              integerFlux.flatMap(e -> Mono.just(e) //映射会并行执行
                      .map(es -> weeks[es-1])
                      .subscribeOn(Schedulers.parallel())
              ).subscribe(System.out::println); //结果乱序输出
          }
    • 可选模式

      Schedulers method 描述
      Schedulers.immediate() 在当前线程中执行订阅
      .single() 使所有调用者在单个可重用线程中执行订阅
      .newSingle() 在每次调用的专用线程中执行订阅
      .elastic() 通过一个无界的工作者线程池,使用其中的线程执行订阅
      .parallel() 通过一个固定的线程池(CPU核心数),使用其中的线程执行订阅
  • buffer: 缓冲区,使用缓冲区将流转变成一块一块的较大的数据集(List集合指定大小)进行传播
        @Test
        public void bufferFluxes(){
            Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7,8,9,10);
            integerFlux.buffer(3) //每三个元素组成一个List集合
                    .flatMap(x -> Flux.fromIterable(x)
                            .subscribeOn(Schedulers.parallel())
                            .log()) //打印日志
                    .subscribe();
    
            //无参buffer方法: 无参数时将所有元素收集到一个List中然后放入Flux中返回
            Flux<List<Integer>> list = integerFlux.buffer();
            //collectList方法: 收集到一个List中,但是会将List放入Mono返回
            Mono<List<Integer>> monoList = integerFlux.collectList();
        }
  • collectionList: 收集到一个List中,但是会将List放入Mono中返回
    //collectList方法: 收集到一个List中,但是会将List放入Mono返回
    Mono<List<Integer>> monoList = integerFlux.collectList();
    //无参buffer方法: 无参数时将所有元素收集到一个List中然后放入Flux中返回
    Flux<List<Integer>> list = integerFlux.buffer();
  • collectMap: 收集到Map中,然后将Map放入Mono中返回
        @Test
        public void collectionMapFluxes(){
            Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7);
            String[] weeks = new String[]{"Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"};
            Mono<Map<Integer,String>> mapMono =integerFlux.collectMap(__->__, e->weeks[e-1]); //两个参数定义map的key,value
            //collectMap只有一个参数时为key,value为integerFlux中的值
            mapMono.subscribe(e -> e.entrySet().forEach(es -> System.out.println(es.getKey()+" : " + es.getValue())));
        }
4.4 逻辑操作

判断是否匹配指定的某些规则

  • all: 是否所有元素都符合指定规则
        @Test
        public void allFluxes(){
            Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7);
            Mono<Boolean> allMono = integerFlux.all(e -> e > 0); //都大于0时返回true
            //验证
            StepVerifier.create(allMono)
                    .expectNext(true)
                    .verifyComplete();
        }
  • any: 是否有元素符合指定规则
        @Test
        public void anyFluxes(){
            Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7);
            Mono<Boolean> anyMono = integerFlux.any(e -> e == 2); //存在一个元素等于2时返回true
            //验证
            StepVerifier.create(anyMono)
                    .expectNext(true)
                    .verifyComplete();
        }

文章作者: Bryson
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Bryson !
评论
 上一篇
Spring-Reactive Spring:开发reactive APIS Spring-Reactive Spring:开发reactive APIS
使用Spring WebFlux *同步阻塞I/O模型: *传统的基于servlet的web框架(如Spring MVC),本质上是阻塞和多线程的,每一个请求线程对应一个工作线程去处理.在接收到请求时会从线程池中取出一个工作线程去处理,在
2020-07-22
下一篇 
Spring-Integration-flow Spring-Integration-flow
概述 Integration flow 通俗的说就是定义了一个管道流连接两个端点(输入端,与输出端), 输入端和输出端代表着不同的组件,通过这个管道流可以更加简单的实现messages在不同组件之间的传输 在传输中可以轻松的实现对me
2020-07-06
  目录