Reactor介绍
1. 理解reactive programming
reactive programming(反应式编程):
- 是一种基于data stream(数据流)和propagation of change(变化传递)的declarative(声明式)的编程范式
- 是一种非阻塞丶事件驱动数据流的开发方案,使用函数式编程的概念来操作数据流,系统中某部分的数据变动后会自动更新其他部分,而且成本极低
- 是观察者模式(推模式)的一种延伸,不同于传统的命令编程方式(Imperative programming)同步拉取数据的方式(如迭代器模式),而是采用数据发布者同步或者异步地进行推送
- 实现上,屏蔽了并发实现的复杂细节,提供数据流的有序操作,实现更加简单,代码可读性高
2. Reactive Streams
Reactive Streams可以归纳为四种接口定义
- *Publisher(发布者): * 发布数据到Subscriber
- *Subscriber(订阅者): * 消费数据
- Subscription(订阅): ** 定义了订阅的规则(一次订阅多少数据),会根据订阅的规则推送数据,这也是backpressure**(反压)产生的原因,可以有效阻止发布者推送超出订阅者处理能力的数据
- *Processor(处理器): * 继承了
Subscriber
和Processor
,因此可以作为订阅者接收并处理数据,并且能作为发布者将处理后的数据推送给订阅者
//发布者
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber); //指定数据将推送给哪些订阅者
}
//订阅者: 可以接受来自发布者的事件
public interface Subscriber<T> {
void onSubscribe(Subscription sub); //指定订阅规则
void onNext(T item); //执行后将订阅数据传递给订阅者
void onError(Throwable ex);//产生任何错误时调用
void onComplete();//发布完成: 发布者没有数据并且将不再产生数据
}
//订阅
public interface Subscription {
void request(long n); //请求订阅:参数为订阅规则,一次可以订阅多少数据,执行后发布者发布数据进入流中
void cancel(); //表示订阅者不在需要订阅数据
}
//处理器
public interface Processor<T, R>
extends Subscriber<T>, Publisher<R> {}
3. 添加Reactor依赖
reactor-core
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency>
reactor-test
如果需要测试支持则引入此依赖<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency>
如果不是使用的Spring Boot项目,则需要额外引入
Reactor's BOM
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
4. 使用常见的reactive operations(反应式操作)
Reactor提供了两个最基本的构件
Flux
(多数据)和Mono
(单数据)两者所包含的操作比较相似,一共可以分为四个大类
creation
: 创建操作combination
: 组合操作transformation
: 转换操作logic
: 逻辑操作
4.1 创建
通过对象创建
@Test public void createFromObject(){ Flux<String> stringFlux = Flux.just("Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"); stringFlux.subscribe(System.out::println); //推荐验证方式: 验证Flux数据是否符合预期 StepVerifier.create(stringFlux) .expectNext("Monday") .expectNext("Tuesday") .expectNext("Wednesday") .expectNext("Thursday") .expectNext("Friday") .expectNext("Saturday") .expectNext("Sunday") .verifyComplete(); }
通过集合创建
array
@Test public void createFromArray(){ String[] weeks = new String[]{"Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"}; Flux<String> stringFlux = Flux.fromArray(weeks); //创建 //推荐验证方式: 验证Flux数据是否符合预期 StepVerifier.create(stringFlux) .expectNext("Monday") .expectNext("Tuesday") .expectNext("Wednesday") .expectNext("Thursday") .expectNext("Friday") .expectNext("Saturday") .expectNext("Sunday") .verifyComplete(); }
iterable
//实现Iterable接口的类即可(如Collection) @Test public void createFromIterable(){ List<String> weeksList = List.of("Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"); Flux<String> stringFlux = Flux.fromIterable(weeksList); //创建 }
stream
@Test public void createFromStream(){ Stream<String> weeksStream = Stream.of("Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"); Flux<String> stringFlux = Flux.fromStream(weeksStream); //创建 }
创建并自动生成数据
使用整数序列填充Flux
//创建时复制整数序列[1,10] @Test public void createFluxOfRange(){ Flux<Integer> integerFlux = Flux.range(1,10);//创建并赋值从1开始(包含1)递增生成10个数字[1,10] integerFlux.subscribe(System.out::println); }
使用定时创建数据填充Flux
@Test public void createFluxOfInterval(){ //从0开始每秒生成一个long值,生成5个结束: 0L,1L,2L,3L,4L Flux<Long> integerFlux = Flux.interval(Duration.ofSeconds(1)).take(5); integerFlux.subscribe(System.out::println); }
4.2 组合
可以将一个Flux(Mono)分为多个,或者将多个合并为一个
归并操作
@Test public void mergeFluxes(){ Flux<Integer> flux1 = Flux.just(1,3,5,7,9).delayElements(Duration.ofMillis(500)); Flux<Integer> flux2 = Flux.just(2,4,6,8,10) .delaySubscription(Duration.ofMillis(250)) //延迟250ms准备开始订阅 .delayElements(Duration.ofMillis(200)); //开始订阅后,每次延迟200ms执行订阅操作Subscriber.onNext(T) Flux<Integer> mergeFlux = flux1.mergeWith(flux2); //归并操作: 按照订阅顺序归并 }
压缩操作(等待所有源都发布一个元素,然后将这些元素合并为一个元素放入新的Flux(Mono)中)
@Test public void zipFluxes(){ Flux<Integer> flux1 = Flux.just(1,3,5,7,9); Flux<Integer> flux2 = Flux.just(2,4,6,8,10); Flux<String> zipFlux = Flux.zip(flux1,flux2,(x,y) -> x + "#" + y); zipFlux.subscribe(System.out::println); //输出: 1#2 3#4 ... 9#10 }
first(): (所有源发布一个元素,选取其中第一个放入新的Flux(Mono)中)
@Test public void firstFluxes(){ Flux<Integer> flux1 = Flux.just(1,3,5,7,9).delaySubscription(Duration.ofMillis(100)); Flux<Integer> flux2 = Flux.just(2,4,6,8,10); Flux<Integer> zipFlux = Flux.first(flux1,flux2); zipFlux.subscribe(System.out::println); //输出: 2 4 6 8 10 }
4.3 转换和过滤操作
skip: 可以跳过前n个数据,或者跳过前n时间段内的数据
@Test public void skipFluxes(){ //1秒发送一个 Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7,8,9,10).delayElements(Duration.ofSeconds(1)); Flux<Integer> skipFlux = integerFlux.skip(5);//跳过前5个数据 StepVerifier.create(skipFlux) .expectNext(6,7,8,9,10) .verifyComplete(); Flux<Integer> skipDurationFlux = integerFlux.skip(Duration.ofSeconds(6)); //跳过前6秒的数据 StepVerifier.create(skipDurationFlux) .expectNext(6,7,8,9,10) .verifyComplete(); }
take: 与skip相反,只获取前几项,或者前n时间段内的数据
@Test public void takeFluxes(){ //1秒发送一个 Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7,8,9,10).delayElements(Duration.ofSeconds(1)); Flux<Integer> takeFlux = integerFlux.take(5);//获取前5个数据 StepVerifier.create(takeFlux) .expectNext(1,2,3,4,5) .verifyComplete(); Flux<Integer> takeDurationFlux = integerFlux.take(Duration.ofSeconds(6)); //获取前6秒的数据 StepVerifier.create(takeDurationFlux) .expectNext(1,2,3,4,5) .verifyComplete(); }
filter: 过滤
@Test public void filterFluxes(){ Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7,8,9,10); Flux<Integer> filterFlux = integerFlux.filter(e -> (e & 1) == 1); //奇数 StepVerifier.create(filterFlux) .expectNext(1,3,5,7,9) .verifyComplete(); }
distinct: 去重复
@Test public void distinctFluxes(){ Flux<Integer> integerFlux = Flux.just(1,1,2,3,4,5,5,5,6,6,6).distinct(); //去重复 StepVerifier.create(integerFlux) .expectNext(1,2,3,4,5,6) .verifyComplete(); }
映射
- map: 一对一映射: 将一个对象转换为另一个对象
- flatMap: 一对多映射: 将一个对象转换为多个对象放入Publisher中,最终将所有Publisher中的元素合并为一个Publisher
- flatMap可以和subscribeOn搭配实现并发处理映射任务
@Test public void mapFluxes(){ Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7); String[] weeks = new String[]{"Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"}; Flux<String> weeksFlux = integerFlux.map(e -> weeks[e-1]); StepVerifier.create(weeksFlux) .expectNext(weeks) .verifyComplete(); //flatMap Flux<String> intWeeksFlux = integerFlux.flatMap(e -> Flux.just(e.toString(),weeks[e-1])); StepVerifier.create(intWeeksFlux) .expectNext("1","Monday","2","Tuesday","3","Wednesday","4","Thursday","5","Friday","6","Saturday","7","Sunday") .verifyComplete(); }
subscribeOn: 定义执行订阅的模式
Schedulers method
(如并行等)实例
@Test public void subscribeOnFluxes(){ Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7); String[] weeks = new String[]{"Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"}; integerFlux.flatMap(e -> Mono.just(e) //映射会并行执行 .map(es -> weeks[es-1]) .subscribeOn(Schedulers.parallel()) ).subscribe(System.out::println); //结果乱序输出 }
可选模式
Schedulers method 描述 Schedulers.immediate() 在当前线程中执行订阅 .single() 使所有调用者在单个可重用线程中执行订阅 .newSingle() 在每次调用的专用线程中执行订阅 .elastic() 通过一个无界的工作者线程池,使用其中的线程执行订阅 .parallel() 通过一个固定的线程池(CPU核心数),使用其中的线程执行订阅
buffer: 缓冲区,使用缓冲区将流转变成一块一块的较大的数据集(List集合指定大小)进行传播
@Test public void bufferFluxes(){ Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7,8,9,10); integerFlux.buffer(3) //每三个元素组成一个List集合 .flatMap(x -> Flux.fromIterable(x) .subscribeOn(Schedulers.parallel()) .log()) //打印日志 .subscribe(); //无参buffer方法: 无参数时将所有元素收集到一个List中然后放入Flux中返回 Flux<List<Integer>> list = integerFlux.buffer(); //collectList方法: 收集到一个List中,但是会将List放入Mono返回 Mono<List<Integer>> monoList = integerFlux.collectList(); }
collectionList: 收集到一个List中,但是会将List放入Mono中返回
//collectList方法: 收集到一个List中,但是会将List放入Mono返回 Mono<List<Integer>> monoList = integerFlux.collectList(); //无参buffer方法: 无参数时将所有元素收集到一个List中然后放入Flux中返回 Flux<List<Integer>> list = integerFlux.buffer();
collectMap: 收集到Map中,然后将Map放入Mono中返回
@Test public void collectionMapFluxes(){ Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7); String[] weeks = new String[]{"Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"}; Mono<Map<Integer,String>> mapMono =integerFlux.collectMap(__->__, e->weeks[e-1]); //两个参数定义map的key,value //collectMap只有一个参数时为key,value为integerFlux中的值 mapMono.subscribe(e -> e.entrySet().forEach(es -> System.out.println(es.getKey()+" : " + es.getValue()))); }
4.4 逻辑操作
判断是否匹配指定的某些规则
all: 是否所有元素都符合指定规则
@Test public void allFluxes(){ Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7); Mono<Boolean> allMono = integerFlux.all(e -> e > 0); //都大于0时返回true //验证 StepVerifier.create(allMono) .expectNext(true) .verifyComplete(); }
any: 是否有元素符合指定规则
@Test public void anyFluxes(){ Flux<Integer> integerFlux = Flux.just(1,2,3,4,5,6,7); Mono<Boolean> anyMono = integerFlux.any(e -> e == 2); //存在一个元素等于2时返回true //验证 StepVerifier.create(anyMono) .expectNext(true) .verifyComplete(); }