0%

行为型-观察者模式

定义

观察者模式(Observer Design Pattern)也被称为发布订阅模式(Publish-Subscribe Design Pattern)。在 GoF 的《设计模式》一书中,它的定义是这样的:

Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.

一般情况下,被依赖的对象叫作被观察者(Observable),依赖的对象叫作观察者(Observer)。不过,在实际的项目开发中,这两种对象的称呼是比较灵活的,有各种不同的叫法,比如:Subject-Observer、Publisher-Subscriber、Producer-Consumer、EventEmitter-EventListener、Dispatcher-Listener。不管怎么称呼,只要应用场景符合刚刚给出的定义,都可以看作观察者模式。

基于不同场景的不同实现方式

小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子,比如,邮件订阅、RSS Feeds,本质上都是观察者模式。不同的应用场景和需求下,这个模式也有截然不同的实现方式,有同步阻塞的实现方式,也有异步非阻塞的实现方式;有进程内的实现方式,也有跨进程的实现方式。

对比生产者-消费者模式

  • 发布-订阅模型,是一对多的关系,可以以同步的方式实现,也可以以异步的方式实现。
  • 生产-消费模型,是多对多的关系,一般以异步的方式实现

两者都可以达到解耦的作用

应用

  1. jdk自带实现

jdk自带的观察者模式实现比较简单,通过实现Observer接口实现观察者,Observable类作为观察机制的控制中心,负责观察者的生命周期管理及被观察者变化的通知。

核心类有两个:Observer接口和Observable类,简略代码如下(省略了部分非核心代码及注释)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public interface Observer {
void update(Observable o, Object arg);
}
public class Observable {
// 核心变量,标记被观察者是否发生了变化
private boolean changed = false;
// 核心变量,观察者集合
private Vector<Observer> obs;

public Observable() {
obs = new Vector<>();
}

/**
* 注册观察者
**/
public synchronized void addObserver(Observer o) {
if (o == null)
throw new NullPointerException();
if (!obs.contains(o)) {
obs.addElement(o);
}
}

/**
* 删除观察者
**/
public synchronized void deleteObserver(Observer o) {
obs.removeElement(o);
}

/**
* 通知变化
**/
public void notifyObservers(Object arg) {
Object[] arrLocal;

synchronized (this) {
if (!changed)
return;
arrLocal = obs.toArray();
clearChanged();
}

for (int i = arrLocal.length-1; i>=0; i--)
((Observer)arrLocal[i]).update(this, arg);
}

protected synchronized void setChanged() {
changed = true;
}

protected synchronized void clearChanged() {
changed = false;
}

public synchronized boolean hasChanged() {
return changed;
}

public synchronized int countObservers() {
return obs.size();
}

}

使用demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 状态变更watcher
**/
public class StatusWatcher extends Observable {
public void setStatus(String status){
System.out.println("status change~~~~~~~~~~");
setChanged();
notifyObservers(status);
}
}

/**
* 状态变更observer
**/
public class StatusObserverImpl implements Observer {
@Override
public void update(Observable o, Object arg) {
System.out.println("notify~~~ arg:" + arg);
}
}

// 1、通过继承的方式实现观察者控制中心
// 2、实现Observer接口实现自己的观察者
// 3、调用setChanged()设置变更状态
// 4、调用notifyObservers(Obj)通知观察者
public static void main(String[] args) {
StatusWatcher watcher = new StatusWatcher();
watcher.addObserver(new StatusObserverImpl());

watcher.setStatus("gggg");
watcher.setStatus("oooo");

System.out.println("~~~~~~end");

}
  1. guava实现

guava相对于jdk的实现来说功能更加丰富了一些,支持根据不同的入参唤醒不同的观察者,支持异步非阻塞的通知机制。

核心类:EventBus(事件总线)、Subscriber(观察者)、Dispatcher(事件分发器),核心代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@Beta
public class EventBus {

private static final Logger logger = Logger.getLogger(EventBus.class.getName());

private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;

private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
private final Dispatcher dispatcher;

EventBus(
String identifier,
Executor executor,
Dispatcher dispatcher,
SubscriberExceptionHandler exceptionHandler) {
this.identifier = checkNotNull(identifier);
this.executor = checkNotNull(executor);
this.dispatcher = checkNotNull(dispatcher);
this.exceptionHandler = checkNotNull(exceptionHandler);
}

public void register(Object object) {
subscribers.register(object);
}

public void unregister(Object object) {
subscribers.unregister(object);
}

public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
class Subscriber {
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}

/** The event bus this subscriber belongs to. */
@Weak private EventBus bus;

/** The object with the subscriber method. */
@VisibleForTesting final Object target;

/** Subscriber method. */
private final Method method;

/** Executor to use for dispatching events to this subscriber. */
private final Executor executor;

private Subscriber(EventBus bus, Object target, Method method) {
this.bus = bus;
this.target = checkNotNull(target);
this.method = method;
method.setAccessible(true);

this.executor = bus.executor();
}

/** Dispatches {@code event} to this subscriber using the proper executor. */
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}

/**
* Invokes the subscriber method. This method can be overridden to make the invocation
* synchronized.
*/
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
}
abstract class Dispatcher {

static Dispatcher perThreadDispatchQueue() {
return new PerThreadQueuedDispatcher();
}

static Dispatcher legacyAsync() {
return new LegacyAsyncDispatcher();
}

static Dispatcher immediate() {
return ImmediateDispatcher.INSTANCE;
}

abstract void dispatch(Object event, Iterator<Subscriber> subscribers);

private static final class PerThreadQueuedDispatcher extends Dispatcher {

private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};

private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};

@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));

if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
}

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class OtherSubscriber {
@Subscribe
public void printStatus(Integer status){
System.out.println("subscriber success + status : " + status);
}
}

public class StatusSubscriber {
@Subscribe
public void printStatus(String status){
System.out.println("subscriber success + status : " + status);
}
}
// guava的观察者机制相对于jdk来说使用起来更加简便,同时支持通知消息的分类及非阻塞方式的实现
public static void main(String[] args) {
EventBus eventBus = new EventBus();
eventBus.register(new StatusSubscriber());
eventBus.register(new OtherSubscriber());

eventBus.post("ssss");
eventBus.post(1);

System.out.println("end~~~~~~");
}
  1. spring框架实现

spring自带实现最大的优势是所有的对象都可以交给spring去管理。大体功能和guava类似。

核心类:ApplicationEventPublisher(事件发布类)、ApplicationListener(事件监听器),代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 事件
public abstract class ApplicationEvent extends EventObject {
private static final long serialVersionUID = 7099057708183571937L;

private final long timestamp;

public ApplicationEvent(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}

public final long getTimestamp() {
return this.timestamp;
}

}

// 事件发布者
public interface ApplicationEventPublisher {
void publishEvent(ApplicationEvent event);

void publishEvent(Object event);

}

// 事件监听者
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
/**
* Handle an application event.
* @param event the event to respond to
*/
void onApplicationEvent(E event);

}

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class EventNotify implements ApplicationContextAware {
private static ThreadPoolExecutor executor;

private static ApplicationContext applicationContext;

static {
executor = ThreadPoolExecutorFactory.of("ApplicationEventSender",4,10,ThreadPoolExecutorFactory.CALLER_RUNER);
}

public static void notifyListener(final ApplicationEvent event){
if(event != null){
executor.submit(() -> applicationContext.publishEvent(event));
}
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
EventNotify.applicationContext = applicationContext;
}
}

EventNotify.notifyListener(new UserRoleChangeEvent(this,userRoleForm.getUid()));

public class UserRoleChangeListener {
@Autowired
private UserRoleChangeHandler userRoleChangeHandler;

@EventListener(classes = UserRoleChangeEvent.class)
public void receiveEvent(final UserRoleChangeEvent event){
log.info("【UserRoleChange】event:{}", JSON.toJSONString(event));
userRoleChangeHandler.rolechange(event.getRoleId());
}
}