概述
- Spring Data’s reactive repositories
- 基于Cassandra构建reactive repository
- 基于MongoDB构建reactive repository
Spring Data’s reactive repositories
1. 认识
- Spring Data 提供了对reactive repository的支持,但是同时需要支持
reactive programming model
的数据库才可以 - 目前支持响应式编程模型的数据库有Cassandra丶MongoDB丶Couchbase丶Redis
- 目前
JPA
不支持响应式
2. 基本
Spring Data reactive简单的说就是在
reactive repositories
中的方法接收或者返回的都为reactive type类型(Mono 或者 Flux)如
Repository
中定义接口如下Flux<Ingredient> findByType(Ingredient.Type type); Flux<Taco> saveAll(Publisher<Taco> tacoPublisher);
3. reactive type 与非反应式类型转换
对于关系型数据库使用了JPA来说并不支持
reactive repository
,如果想使用reactive特性,可以通过类型转换来适配, 但是这样会破坏反应式编程模型
Mono
类型通过block方法转换为普通类型Taco taco = tacoMono.block(); tacoRepo.save(taco);
Flux
类型通过toIterable方法转换为非反应式类型Iterable<Taco> tacos = tacoFlux.toIterable(); tacoRepo.saveAll(tacos);
统一使用
subscribe
方法发布tacoFlux.subscribe(taco -> { tacoRepo.save(taco); });
基于Cassandra构建reactive repository
Cassandra 是一个分布式丶高性能丶高可用丶最终一致丶面向列丶无单点故障的NoSQL数据库
1. 下载安装
- 直接官网下载解压
- 找到安装目录下
bin/cassandra.bat
运行启动 conf/cassandra.yaml
是配置文件,可以配置数据存储位置,账号登- 默认的用户名和密码都为
cassandra
2. 准备数据
创建
keyspace
(类似于创建数据库)//打开powershell执行cqlsh命令切换到cqlsh模式 cqlsh> create keyspace tacocloud //键空间 ... with replication={'class':'SimpleStrategy', 'replication_factor':1} //复制策略:简单策略,在数据中心中有一份复制 ... and durable_writes=true; //是否写入commit log,默认为true,如果为false,则有丢失数据的风险
创建表
CREATE TABLE POP_IN_STOCK( ID int PRIMARY KEY, TASKID int, POP_CODE text, POP_NAME text, VERSION text, BRAND text, IN_STOCK_COUNT double, COMMENT text );
添加数据
INSERT INTO POP_IN_STOCK (ID,TASKID, POP_CODE, POP_NAME, BRAND, IN_STOCK_COUNT) VALUES (1,934924, 'R0B0506432', 'BBB', 'B', 1);
3. Spring boot配置
Maven依赖
不使用reactive programming
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-cassandra</artifactId> </dependency>
使用响应式编程
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-cassandra-reactive</artifactId> </dependency>
配置
spring: data: cassandra: keyspace-name: tacocloud #schema-action: recreate_drop_unused #启动时删除并重新创建表 username: cassandra #默认用户名 password: cassandra #默认密码 contact-points: - localhost #- cassandra2.com.cn #其他服务器添加 port: 9042 #默认端口
4. 映射
单主键
@Data @RequiredArgsConstructor @NoArgsConstructor(access=AccessLevel.PRIVATE, force=true) @Table("ingredients") // public class Ingredient { @PrimaryKey //主键 private final String id; private final String name; private final Type type; public static enum Type { WRAP, PROTEIN, VEGGIES, CHEESE, SAUCE } }
多主键
@Data @RestResource(rel="tacos", path="tacos") @Table("tacos") public class Taco { @PrimaryKeyColumn(type=PrimaryKeyType.PARTITIONED) //分区键 private UUID id = UUIDs.timeBased(); @NotNull @Size(min=5, message="Name must be at least 5 characters long") private String name; @PrimaryKeyColumn(type=PrimaryKeyType.CLUSTERED, ordering=Ordering.DESCENDING) //集群键:用于表中存储顺序 private Date createdAt = new Date(); @Size(min=1, message="You must choose at least 1 ingredient") @Column("ingredients") private List<IngredientUDT> ingredients; //使用自定义类型(由于集合数据是直接复制的数据然后json形式保存的,因此自定义类型仅使用所需字段保存即可) } //自定义类型:只取需要的字段 @Data @RequiredArgsConstructor @NoArgsConstructor(access=AccessLevel.PRIVATE, force=true) @UserDefinedType("ingredient") public class IngredientUDT { private final String name; private final Ingredient.Type type; }
5. Repository
- *对于Reactive : * 创建接口继承
ReactiveCrudRepository
或ReactiveCassandraRepository
,
ReactiveCassandraRepository
接口继承了ReactiveCrudRepository
- 不同之处是
ReactiveCassandraRepository
提供了insert()
方法的一些变体,对新增文档进行优化,如果有大量插入可以考虑使用- 使用
ReactiveCrudRepository
更有利于代码移植- *对于no-reactive: * 创建接口继承
CrudRepository
或CassandraRepository
public interface IngredientRepository extends ReactiveCrudRepository<Ingredient, String> {
}
public interface UserRepository extends ReactiveCassandraRepository<User, UUID> {
@AllowFiltering
Mono<User> findByUsername(String username);
}
6. 实例: 简单实体映射
domain
@Data @Table("POP_IN_STOCK") public class PopInStock implements Serializable{ @PrimaryKey //主键 private Integer id; @Column("TASKID") //数据库对应字段名 private Integer task; @Column("POP_CODE") private String code; @Column("POP_NAME") private String name; private String version; private String brand; @Column("IN_STOCK_COUNT") @JsonProperty(value = "count") private Double inStockCount; private String comment; }
repository
public interface ReactiveJpaPopRepository extends ReactiveCrudRepository<PopInStock,Integer> {}
controller
@Controller @RequestMapping("/reactive/pop") public class ReactivePopController { private ReactiveJpaPopRepository reactiveJpaPopRepository; @Autowired public ReactivePopController(ReactiveJpaPopRepository reactiveJpaPopRepository) { this.reactiveJpaPopRepository = reactiveJpaPopRepository; } @GetMapping("/{id}") @ResponseBody public Mono<PopInStock> popById(@PathVariable("id") int id){ return reactiveJpaPopRepository.findById(id); } @GetMapping("/recent") @ResponseBody public Flux<PopInStock> recent(){ return reactiveJpaPopRepository.findAll().take(12); } @GetMapping("/put/{id}") @ResponseBody public Mono<PopInStock> process(@PathVariable int id){ PopInStock pop = new PopInStock(); pop.setId(id); pop.setName("测试"); pop.setTask(123+id); pop.setCode("R0B0506432"); return reactiveJpaPopRepository.save(pop); } }
7.实例: 较复杂实体映射
domain
@Data @Table("POP_FROM") public class PopFrom { @PrimaryKey private Integer id; @Column("CREATE_BY") private String createBy; @Column("CREATE_DATE") private LocalDate createDate; //使用用户定义类型只从PopInStock中取部分数据 private List<PopInStockUDT> pops; } //用户定义类型 @Data @UserDefinedType("popInStock") public class PopInStockUDT { private String code; private String name; }
repository
public interface ReactivePopFromRepository extends ReactiveCrudRepository<PopFrom,Integer> { @AllowFiltering //由于cassandra默认情况下只允许根据主键过滤,如果想使用其他字段过滤则需要 allow filtering //等价于CQL: select * from pop_from where create_by = 'Tom1' allow filtering; Flux<PopFrom> getByCreateBy(String name); }
基于MongoDB构建reactive repository
MongoDB是基于Binary JSON文档存储的非关系型数据库
1. 概述
在Spring boot中使用方式除了
domain
映射配置不同之外,其他Repository
丶Controller
都于以上类似
2. Maven依赖
不使用reactive programming
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency>
使用响应式编程
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive </artifactId> </dependency>
方便开发测试的内嵌数据库版本,默认
localhost:27017
<dependency> <groupId>de.flapdoodle.embed</groupId> <artifactId>de.flapdoodle.embed.mongo</artifactId> </dependency>
3. 配置
spring:
data:
mongodb:
host: mongodb.tacocloud.com
port: 27018
username: tacocloud
password: s3cr3tp455w0rd
database: tacoclouddb #默认为test
4.映射(实体映射到文档)
- *@Id : *对应document ID //必须
- *@Document : * domain类型映射到MongoDB的document //必须
- *@Field: * 定义字段名 //非必须
@Data
@RequiredArgsConstructor
@NoArgsConstructor(access=AccessLevel.PRIVATE, force=true)
@Document //默认为类名首字母小写: ingredient
//@Document(collection="ingredients") //指定名称
public class Ingredient {
@Id //string类型id,MongoDB会自动赋值(如果为null)
private final String id;
private final String name;
@Field("CUSTOMER_TYPE")
private final Type type;
public static enum Type {
WRAP, PROTEIN, VEGGIES, CHEESE, SAUCE
}
//集合类型: 会额外复制数据直接存储,都是MongoDB自动存储
//无需额外设置,也无需像cassandra一样使用用户定义类型
private List<Taco> tacos;
}
5. Repository
- *对于Reactive : * 创建接口继承
ReactiveCrudRepository
或ReactiveMongoRepository
,
- 不同之处是
ReactiveMongoRepository
提供了额外的insert()
方法对新增文档进行优化,如果有大量插入可以考虑使用- 使用
ReactiveCrudRepository
更有利于代码移植- *对于no-reactive: * 创建接口继承
CrudRepository
或MongoRepository
@CrossOrigin(origins="*")
public interface IngredientRepository
extends ReactiveCrudRepository<Ingredient, String> {
Flux<Taco> findByOrderByCreatedAtDesc();
}
public interface UserRepository extends ReactiveMongoRepository<User, String> {
Mono<User> findByUsername(String username);
}