定义
观察者模式(Observer Design Pattern)也被称为发布订阅模式(Publish-Subscribe Design Pattern)。在 GoF 的《设计模式》一书中,它的定义是这样的:
Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.
一般情况下,被依赖的对象叫作被观察者(Observable),依赖的对象叫作观察者(Observer)。不过,在实际的项目开发中,这两种对象的称呼是比较灵活的,有各种不同的叫法,比如:Subject-Observer、Publisher-Subscriber、Producer-Consumer、EventEmitter-EventListener、Dispatcher-Listener。不管怎么称呼,只要应用场景符合刚刚给出的定义,都可以看作观察者模式。
基于不同场景的不同实现方式
小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子,比如,邮件订阅、RSS Feeds,本质上都是观察者模式。不同的应用场景和需求下,这个模式也有截然不同的实现方式,有同步阻塞的实现方式,也有异步非阻塞的实现方式;有进程内的实现方式,也有跨进程的实现方式。
对比生产者-消费者模式
- 发布-订阅模型,是一对多的关系,可以以同步的方式实现,也可以以异步的方式实现。
- 生产-消费模型,是多对多的关系,一般以异步的方式实现
两者都可以达到解耦的作用
应用
- jdk自带实现
jdk自带的观察者模式实现比较简单,通过实现Observer接口实现观察者,Observable类作为观察机制的控制中心,负责观察者的生命周期管理及被观察者变化的通知。
核心类有两个:Observer接口和Observable类,简略代码如下(省略了部分非核心代码及注释)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| public interface Observer { void update(Observable o, Object arg); } public class Observable { private boolean changed = false; private Vector<Observer> obs;
public Observable() { obs = new Vector<>(); }
public synchronized void addObserver(Observer o) { if (o == null) throw new NullPointerException(); if (!obs.contains(o)) { obs.addElement(o); } }
public synchronized void deleteObserver(Observer o) { obs.removeElement(o); }
public void notifyObservers(Object arg) { Object[] arrLocal;
synchronized (this) { if (!changed) return; arrLocal = obs.toArray(); clearChanged(); }
for (int i = arrLocal.length-1; i>=0; i--) ((Observer)arrLocal[i]).update(this, arg); } protected synchronized void setChanged() { changed = true; } protected synchronized void clearChanged() { changed = false; } public synchronized boolean hasChanged() { return changed; } public synchronized int countObservers() { return obs.size(); }
}
|
使用demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
public class StatusWatcher extends Observable { public void setStatus(String status){ System.out.println("status change~~~~~~~~~~"); setChanged(); notifyObservers(status); } }
public class StatusObserverImpl implements Observer { @Override public void update(Observable o, Object arg) { System.out.println("notify~~~ arg:" + arg); } }
public static void main(String[] args) { StatusWatcher watcher = new StatusWatcher(); watcher.addObserver(new StatusObserverImpl());
watcher.setStatus("gggg"); watcher.setStatus("oooo");
System.out.println("~~~~~~end");
}
|
- guava实现
guava相对于jdk的实现来说功能更加丰富了一些,支持根据不同的入参唤醒不同的观察者,支持异步非阻塞的通知机制。
核心类:EventBus(事件总线)、Subscriber(观察者)、Dispatcher(事件分发器),核心代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
| @Beta public class EventBus {
private static final Logger logger = Logger.getLogger(EventBus.class.getName());
private final String identifier; private final Executor executor; private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers = new SubscriberRegistry(this); private final Dispatcher dispatcher;
EventBus( String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) { this.identifier = checkNotNull(identifier); this.executor = checkNotNull(executor); this.dispatcher = checkNotNull(dispatcher); this.exceptionHandler = checkNotNull(exceptionHandler); } public void register(Object object) { subscribers.register(object); }
public void unregister(Object object) { subscribers.unregister(object); }
public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { post(new DeadEvent(this, event)); } } class Subscriber { static Subscriber create(EventBus bus, Object listener, Method method) { return isDeclaredThreadSafe(method) ? new Subscriber(bus, listener, method) : new SynchronizedSubscriber(bus, listener, method); }
@Weak private EventBus bus;
@VisibleForTesting final Object target;
private final Method method;
private final Executor executor;
private Subscriber(EventBus bus, Object target, Method method) { this.bus = bus; this.target = checkNotNull(target); this.method = method; method.setAccessible(true);
this.executor = bus.executor(); }
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
@VisibleForTesting void invokeSubscriberMethod(Object event) throws InvocationTargetException { try { method.invoke(target, checkNotNull(event)); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } } } abstract class Dispatcher {
static Dispatcher perThreadDispatchQueue() { return new PerThreadQueuedDispatcher(); }
static Dispatcher legacyAsync() { return new LegacyAsyncDispatcher(); }
static Dispatcher immediate() { return ImmediateDispatcher.INSTANCE; }
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
private static final class PerThreadQueuedDispatcher extends Dispatcher {
private final ThreadLocal<Queue<Event>> queue = new ThreadLocal<Queue<Event>>() { @Override protected Queue<Event> initialValue() { return Queues.newArrayDeque(); } }; private final ThreadLocal<Boolean> dispatching = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { return false; } };
@Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } } }
|
demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class OtherSubscriber { @Subscribe public void printStatus(Integer status){ System.out.println("subscriber success + status : " + status); } }
public class StatusSubscriber { @Subscribe public void printStatus(String status){ System.out.println("subscriber success + status : " + status); } }
public static void main(String[] args) { EventBus eventBus = new EventBus(); eventBus.register(new StatusSubscriber()); eventBus.register(new OtherSubscriber());
eventBus.post("ssss"); eventBus.post(1);
System.out.println("end~~~~~~"); }
|
- spring框架实现
spring自带实现最大的优势是所有的对象都可以交给spring去管理。大体功能和guava类似。
核心类:ApplicationEventPublisher(事件发布类)、ApplicationListener(事件监听器),代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public abstract class ApplicationEvent extends EventObject { private static final long serialVersionUID = 7099057708183571937L;
private final long timestamp;
public ApplicationEvent(Object source) { super(source); this.timestamp = System.currentTimeMillis(); }
public final long getTimestamp() { return this.timestamp; }
}
public interface ApplicationEventPublisher { void publishEvent(ApplicationEvent event);
void publishEvent(Object event);
}
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E event);
}
|
demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class EventNotify implements ApplicationContextAware { private static ThreadPoolExecutor executor;
private static ApplicationContext applicationContext;
static { executor = ThreadPoolExecutorFactory.of("ApplicationEventSender",4,10,ThreadPoolExecutorFactory.CALLER_RUNER); }
public static void notifyListener(final ApplicationEvent event){ if(event != null){ executor.submit(() -> applicationContext.publishEvent(event)); } }
@Override public void setApplicationContext(ApplicationContext applicationContext) { EventNotify.applicationContext = applicationContext; } }
EventNotify.notifyListener(new UserRoleChangeEvent(this,userRoleForm.getUid()));
public class UserRoleChangeListener { @Autowired private UserRoleChangeHandler userRoleChangeHandler;
@EventListener(classes = UserRoleChangeEvent.class) public void receiveEvent(final UserRoleChangeEvent event){ log.info("【UserRoleChange】event:{}", JSON.toJSONString(event)); userRoleChangeHandler.rolechange(event.getRoleId()); } }
|