Spring-Reactive Spring:Spring Data Reactive


概述

  • Spring Data’s reactive repositories
  • 基于Cassandra构建reactive repository
  • 基于MongoDB构建reactive repository

Spring Data’s reactive repositories

1. 认识

  • Spring Data 提供了对reactive repository的支持,但是同时需要支持reactive programming model的数据库才可以
  • 目前支持响应式编程模型的数据库有Cassandra丶MongoDB丶Couchbase丶Redis
  • 目前JPA不支持响应式

2. 基本

  • Spring Data reactive简单的说就是在reactive repositories中的方法接收或者返回的都为reactive type类型(Mono 或者 Flux)

  • Repository中定义接口如下

    Flux<Ingredient> findByType(Ingredient.Type type);
    Flux<Taco> saveAll(Publisher<Taco> tacoPublisher);

3. reactive type 与非反应式类型转换

对于关系型数据库使用了JPA来说并不支持reactive repository ,如果想使用reactive特性,可以通过类型转换来适配, 但是这样会破坏反应式编程模型

  • Mono类型通过block方法转换为普通类型

    Taco taco = tacoMono.block();
    tacoRepo.save(taco);
  • Flux类型通过toIterable方法转换为非反应式类型

    Iterable<Taco> tacos = tacoFlux.toIterable();
    tacoRepo.saveAll(tacos);
  • 统一使用subscribe方法发布

    tacoFlux.subscribe(taco -> {
     tacoRepo.save(taco);
    });

基于Cassandra构建reactive repository

Cassandra 是一个分布式丶高性能丶高可用丶最终一致丶面向列丶无单点故障的NoSQL数据库

1. 下载安装

  • 直接官网下载解压
  • 找到安装目录下bin/cassandra.bat运行启动
  • conf/cassandra.yaml是配置文件,可以配置数据存储位置,账号登
  • 默认的用户名和密码都为cassandra

2. 准备数据

  • 创建keyspace(类似于创建数据库)

    //打开powershell执行cqlsh命令切换到cqlsh模式
    cqlsh> create keyspace tacocloud //键空间
     ... with replication={'class':'SimpleStrategy', 'replication_factor':1} //复制策略:简单策略,在数据中心中有一份复制
     ... and durable_writes=true; //是否写入commit log,默认为true,如果为false,则有丢失数据的风险
  • 创建表

    CREATE TABLE POP_IN_STOCK(
        ID int PRIMARY KEY,
        TASKID int,
        POP_CODE text,
        POP_NAME text,
        VERSION text,
        BRAND text,
        IN_STOCK_COUNT double,
        COMMENT text
    );
  • 添加数据

    INSERT INTO POP_IN_STOCK (ID,TASKID, POP_CODE, POP_NAME, BRAND, IN_STOCK_COUNT) VALUES (1,934924, 'R0B0506432', 'BBB', 'B', 1);

3. Spring boot配置

  • Maven依赖

    • 不使用reactive programming

      <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-data-cassandra</artifactId>
      </dependency>
    • 使用响应式编程

      <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
      </dependency>
  • 配置

    spring:
      data:
        cassandra:
          keyspace-name: tacocloud
          #schema-action: recreate_drop_unused #启动时删除并重新创建表
          username: cassandra #默认用户名
          password: cassandra #默认密码
          contact-points:
            - localhost
            #- cassandra2.com.cn #其他服务器添加
          port: 9042 #默认端口

4. 映射

  • 单主键

    @Data
    @RequiredArgsConstructor
    @NoArgsConstructor(access=AccessLevel.PRIVATE, force=true)
    @Table("ingredients") //
    public class Ingredient {
     @PrimaryKey //主键
     private final String id;
     private final String name;
     private final Type type;
     public static enum Type {
     WRAP, PROTEIN, VEGGIES, CHEESE, SAUCE
     }
    }
  • 多主键

    @Data
    @RestResource(rel="tacos", path="tacos")
    @Table("tacos")
    public class Taco {
     @PrimaryKeyColumn(type=PrimaryKeyType.PARTITIONED) //分区键
     private UUID id = UUIDs.timeBased();
     @NotNull
     @Size(min=5, message="Name must be at least 5 characters long")
     private String name;
     @PrimaryKeyColumn(type=PrimaryKeyType.CLUSTERED,
     ordering=Ordering.DESCENDING) //集群键:用于表中存储顺序
     private Date createdAt = new Date();
     @Size(min=1, message="You must choose at least 1 ingredient")
     @Column("ingredients")
     private List<IngredientUDT> ingredients; //使用自定义类型(由于集合数据是直接复制的数据然后json形式保存的,因此自定义类型仅使用所需字段保存即可)
    }
    
    //自定义类型:只取需要的字段
    @Data
    @RequiredArgsConstructor
    @NoArgsConstructor(access=AccessLevel.PRIVATE, force=true)
    @UserDefinedType("ingredient")
    public class IngredientUDT {
     private final String name;
     private final Ingredient.Type type;
    }

5. Repository

  • *对于Reactive : * 创建接口继承ReactiveCrudRepositoryReactiveCassandraRepository,
    • ReactiveCassandraRepository 接口继承了ReactiveCrudRepository
    • 不同之处是ReactiveCassandraRepository提供了insert()方法的一些变体,对新增文档进行优化,如果有大量插入可以考虑使用
    • 使用ReactiveCrudRepository更有利于代码移植
  • *对于no-reactive: * 创建接口继承CrudRepositoryCassandraRepository
public interface IngredientRepository extends ReactiveCrudRepository<Ingredient, String> {
}
public interface UserRepository extends ReactiveCassandraRepository<User, UUID> {
 @AllowFiltering
 Mono<User> findByUsername(String username);
}

6. 实例: 简单实体映射

  • domain

    @Data
    @Table("POP_IN_STOCK")
    public class PopInStock implements Serializable{
        @PrimaryKey //主键
        private Integer id;
        @Column("TASKID") //数据库对应字段名
        private Integer task;
        @Column("POP_CODE")
        private String code;
        @Column("POP_NAME")
        private String name;
        private String version;
        private String brand;
        @Column("IN_STOCK_COUNT")
        @JsonProperty(value = "count")
        private Double inStockCount;
        private String comment;
    }
  • repository

    public interface ReactiveJpaPopRepository extends ReactiveCrudRepository<PopInStock,Integer> {}
  • controller

    @Controller
    @RequestMapping("/reactive/pop")
    public class ReactivePopController {
        private ReactiveJpaPopRepository reactiveJpaPopRepository;
        @Autowired
        public ReactivePopController(ReactiveJpaPopRepository reactiveJpaPopRepository) {
            this.reactiveJpaPopRepository = reactiveJpaPopRepository;
        }
    
        @GetMapping("/{id}")
        @ResponseBody
        public Mono<PopInStock> popById(@PathVariable("id") int id){
            return reactiveJpaPopRepository.findById(id);
        }
        @GetMapping("/recent")
        @ResponseBody
        public Flux<PopInStock> recent(){
            return reactiveJpaPopRepository.findAll().take(12);
        }
        @GetMapping("/put/{id}")
        @ResponseBody
        public Mono<PopInStock> process(@PathVariable int id){
            PopInStock pop = new PopInStock();
            pop.setId(id);
            pop.setName("测试");
            pop.setTask(123+id);
            pop.setCode("R0B0506432");
            return reactiveJpaPopRepository.save(pop);
        }
    }

7.实例: 较复杂实体映射

  • domain

    @Data
    @Table("POP_FROM")
    public class PopFrom {
        @PrimaryKey
        private Integer id;
        @Column("CREATE_BY")
        private String createBy;
        @Column("CREATE_DATE")
        private LocalDate createDate;
        //使用用户定义类型只从PopInStock中取部分数据
        private List<PopInStockUDT> pops;
    }
    //用户定义类型
    @Data
    @UserDefinedType("popInStock")
    public class PopInStockUDT {
        private String code;
        private String name;
    }
  • repository

    public interface ReactivePopFromRepository extends ReactiveCrudRepository<PopFrom,Integer> {
        @AllowFiltering //由于cassandra默认情况下只允许根据主键过滤,如果想使用其他字段过滤则需要 allow filtering
        //等价于CQL: select * from pop_from where create_by = 'Tom1' allow filtering;
        Flux<PopFrom> getByCreateBy(String name);
    }

基于MongoDB构建reactive repository

MongoDB是基于Binary JSON文档存储的非关系型数据库

1. 概述

在Spring boot中使用方式除了domain映射配置不同之外,其他RepositoryController都于以上类似

2. Maven依赖

  • 不使用reactive programming

    <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-mongodb</artifactId>
    </dependency>
  • 使用响应式编程

    <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-mongodb-reactive </artifactId>
    </dependency>
  • 方便开发测试的内嵌数据库版本,默认localhost:27017

    <dependency>
     <groupId>de.flapdoodle.embed</groupId>
     <artifactId>de.flapdoodle.embed.mongo</artifactId>
    </dependency>

3. 配置

spring:
 data:
 mongodb:
 host: mongodb.tacocloud.com
 port: 27018
 username: tacocloud
 password: s3cr3tp455w0rd
 database: tacoclouddb #默认为test

4.映射(实体映射到文档)

  • *@Id : *对应document ID //必须
  • *@Document : * domain类型映射到MongoDB的document //必须
  • *@Field: * 定义字段名 //非必须
@Data
@RequiredArgsConstructor
@NoArgsConstructor(access=AccessLevel.PRIVATE, force=true)
@Document //默认为类名首字母小写: ingredient
//@Document(collection="ingredients") //指定名称
public class Ingredient {
 @Id //string类型id,MongoDB会自动赋值(如果为null)
 private final String id;
 private final String name;
 @Field("CUSTOMER_TYPE")
 private final Type type;
 public static enum Type {
 WRAP, PROTEIN, VEGGIES, CHEESE, SAUCE
 }
 //集合类型: 会额外复制数据直接存储,都是MongoDB自动存储
 //无需额外设置,也无需像cassandra一样使用用户定义类型
  private List<Taco> tacos;
}

5. Repository

  • *对于Reactive : * 创建接口继承ReactiveCrudRepositoryReactiveMongoRepository,
    • 不同之处是ReactiveMongoRepository提供了额外的insert()方法对新增文档进行优化,如果有大量插入可以考虑使用
    • 使用ReactiveCrudRepository更有利于代码移植
  • *对于no-reactive: * 创建接口继承CrudRepositoryMongoRepository
@CrossOrigin(origins="*")
public interface IngredientRepository
 extends ReactiveCrudRepository<Ingredient, String> {
    Flux<Taco> findByOrderByCreatedAtDesc();
}
public interface UserRepository extends ReactiveMongoRepository<User, String> {
 Mono<User> findByUsername(String username);
}

文章作者: Bryson
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Bryson !
评论
 上一篇
下一篇 
Spring-Reactive Spring:开发reactive APIS Spring-Reactive Spring:开发reactive APIS
使用Spring WebFlux *同步阻塞I/O模型: *传统的基于servlet的web框架(如Spring MVC),本质上是阻塞和多线程的,每一个请求线程对应一个工作线程去处理.在接收到请求时会从线程池中取出一个工作线程去处理,在
2020-07-22
  目录