观察者模式的定义
观察者(Observer)模式: ** 指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于他的对象都得到通知并被自动更新,这种模式又称为发布-订阅模式丶模型-视图模式**,它是对象行为模式
- 在Java中Observable类和Observer接口从1.8+开始已弃用,对于可靠而有序的线程间通信可以使用java.util.concurrent.Flow
特点
1. 优点
- 降低了目标与观察者之间的耦合关系,两者之间是抽象耦合关系
- 目标与观察者之间建立了一套触发机制
2. 缺点
- 目标与观察者之间的依赖关系并没有完全解除,而且有可能出现循环引用
- 当观察者对象很多时,通知的发布会花费很多时间,影响程序效率
模式的结构与实现
1. 模式的结构
抽象主题(Subject)角色: 也叫抽象目标类,它提供了一个用于保存观察者对象的聚集类和增加、删除观察者对象的方法,以及通知所有观察者的抽象方法。
具体主题(Concrete Subject)角色: 也叫具体目标类,它实现抽象目标中的通知方法,当具体主题的内部状态发生改变时,通知所有注册过的观察者对象。
抽象观察者(Observer)角色: 它是一个抽象类或接口,它包含了一个更新自己的抽象方法,当接到具体主题的更改通知时被调用。
具体观察者(Concrete Observer)角色: 实现抽象观察者中定义的抽象方法,以便在得到目标的更改通知时更新自身的状态
2. 模式的实现
2.1 普通实现方式
//抽象目标
abstract class Subject{
protected List<Observer> observers = new ArrayList<>();
//增加观察者
public void add(Observer observer){
observers.add(observer);
}
//删除观察者
public void remove(Observer observer){
observers.remove(observer);
}
//通知观察者
abstract public void notifyObserver();
}
//具体目标
class ConcreteSubject extends Subject{
@Override
public void notifyObserver() {
System.out.println("具体目标发生改变...");
for(Observer obs : observers){
obs.response();
}
}
}
//抽象观察者
@FunctionalInterface //定义成函数式接口,可以使用Lambda表达式简化具体观察者实现
interface Observer{
void response(); //响应
}
//具体观察者1
class ConcreteObserver1 implements Observer{
@Override
public void response() {
System.out.println("具体观察者1做出响应!");
}
}
class ConcreteObserver2 implements Observer{
@Override
public void response() {
System.out.println("具体观察者2做出响应!");
}
}
//测试
public class ObserverPattern {
public static void main(String[] args) {
Subject subject=new ConcreteSubject();
Observer obs1=new ConcreteObserver1();
Observer obs2=new ConcreteObserver2();
//Lambda表达式简化创建具体观察者
Observer obs3 = () -> System.out.println("具体观察者3做出响应!");
Observer obs4 = () -> System.out.println("具体观察者4做出响应!");
subject.add(obs1);
subject.add(obs2);
subject.add(obs3);
subject.add(obs4);
subject.notifyObserver();
}
}
2.2 基于java.util.concurrent.Flow发布-订阅简单使用
//订阅者
class ConcreteSubscriber implements Flow.Subscriber<Integer>{
private String name; //订阅者名字
private long sleepTime; //订阅者获取订阅的间隔时间
private Flow.Subscription subscription;
//发布者(Publisher)注册一个新的订阅者(Subscriber)时会调用此方法 publisher.subscribe(subscriber)
@Override
public void onSubscribe(Flow.Subscription subscription) {
(this.subscription = subscription).request(1); //请求一个订阅内容
}
//获取一个订阅: 调用subscription.request(1)时会执行此方法
@Override
public void onNext(Integer item) {
System.out.println("订阅者"+name+"接收到Publisher的数据: "+ item);
try {
Thread.sleep(sleepTime); //等待一段时间在获取新的订阅内容
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(1); //请求一个订阅内容
}
//当发布者出现异常时调用此方法
@Override
public void onError(Throwable throwable) {
System.out.println("订阅者"+name+"从Publisher获取到错误: "+throwable.getMessage());
}
//当发布者(Publisher)执行close方法时调用此方法
@Override
public void onComplete() {
System.out.println("订阅者"+name+"收到: Publisher发布结束!");
}
public ConcreteSubscriber(String name,long sleepTime) {
this.name = name;
this.sleepTime = sleepTime;
}
public String getName() {
return name;
}
public long getSleepTime() {
return sleepTime;
}
}
public class ObserverPatternFlow {
public static void main(String[] args) throws Exception{
//创建发布者,发布内容为Integer数据,为每一个订阅者最多保4留份数据(订阅者如果没及时接收就会丢弃)
//此处容量会取最接近且大于或者等于输入值的2的幂,如参数3,实际容量为4
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(ForkJoinPool.commonPool(),3);
ConcreteSubscriber subscriber1 = new ConcreteSubscriber("Tom",100); //具体订阅者1
ConcreteSubscriber subscriber2 = new ConcreteSubscriber("jerry",1500); //具体订阅者2
//发布者注册订阅者
publisher.subscribe(subscriber1);
publisher.subscribe(subscriber2);
//发布者发布数据
IntStream.rangeClosed(1,7).forEach(num ->{
int lag = publisher.offer(
num, //发布的内容
1,//等待订阅者Subscriber获取订阅的最大时间1秒
TimeUnit.SECONDS,//时间单位
(subscriber,msg) -> {//丢弃数据时的处理
System.out.println("订阅者"+((ConcreteSubscriber)subscriber).getName()+"获取太慢,丢弃数据"+msg);
return false;
}
);
if(lag < 0) System.out.println("丢弃"+ -lag +"个数据");
else System.out.println("最慢的订阅者还有"+lag+"个数据没有拿");
});
//等待发布者发布完所有数据
while (publisher.estimateMaximumLag() > 0) {
Thread.sleep(500L);
}
publisher.close(); //发布者结束发布会调用订阅者的onComplete()方法
//等待: 确保订阅者接收到发布结束的通知
Thread.sleep(Math.max(subscriber1.getSleepTime(),subscriber2.getSleepTime()));
}
}