观察者模式(Observer Design Pattern)也被称为发布订阅模式(Publish-Subscribe Design Pattern)。在 GoF 的《设计模式》一书中,它的定义是这样的:
Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.
小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子,比如,邮件订阅、RSS Feeds,本质上都是观察者模式。不同的应用场景和需求下,这个模式也有截然不同的实现方式,有同步阻塞的实现方式,也有异步非阻塞的实现方式;有进程内的实现方式,也有跨进程的实现方式。
- 发布-订阅模型,是一对多的关系,可以以同步的方式实现,也可以以异步的方式实现。
- 生产-消费模型,是多对多的关系,一般以异步的方式实现
- jdk自带实现
| public interface Observer { void update(Observable o, Object arg); } public class Observable { private boolean changed = false; private Vector<Observer> obs;
public Observable() { obs = new Vector<>(); }
public synchronized void addObserver(Observer o) { if (o == null) throw new NullPointerException(); if (!obs.contains(o)) { obs.addElement(o); } }
public synchronized void deleteObserver(Observer o) { obs.removeElement(o); }
public void notifyObservers(Object arg) { Object[] arrLocal;
synchronized (this) { if (!changed) return; arrLocal = obs.toArray(); clearChanged(); }
for (int i = arrLocal.length-1; i>=0; i--) ((Observer)arrLocal[i]).update(this, arg); } protected synchronized void setChanged() { changed = true; } protected synchronized void clearChanged() { changed = false; } public synchronized boolean hasChanged() { return changed; } public synchronized int countObservers() { return obs.size(); }
public class StatusWatcher extends Observable { public void setStatus(String status){ System.out.println("status change~~~~~~~~~~"); setChanged(); notifyObservers(status); } }
public class StatusObserverImpl implements Observer { @Override public void update(Observable o, Object arg) { System.out.println("notify~~~ arg:" + arg); } }
public static void main(String[] args) { StatusWatcher watcher = new StatusWatcher(); watcher.addObserver(new StatusObserverImpl());
watcher.setStatus("gggg"); watcher.setStatus("oooo");
- guava实现
| @Beta public class EventBus {
private static final Logger logger = Logger.getLogger(EventBus.class.getName());
private final String identifier; private final Executor executor; private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers = new SubscriberRegistry(this); private final Dispatcher dispatcher;
EventBus( String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) { this.identifier = checkNotNull(identifier); this.executor = checkNotNull(executor); this.dispatcher = checkNotNull(dispatcher); this.exceptionHandler = checkNotNull(exceptionHandler); } public void register(Object object) { subscribers.register(object); }
public void unregister(Object object) { subscribers.unregister(object); }
public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { post(new DeadEvent(this, event)); } } class Subscriber { static Subscriber create(EventBus bus, Object listener, Method method) { return isDeclaredThreadSafe(method) ? new Subscriber(bus, listener, method) : new SynchronizedSubscriber(bus, listener, method); }
@Weak private EventBus bus;
@VisibleForTesting final Object target;
private final Method method;
private final Executor executor;
private Subscriber(EventBus bus, Object target, Method method) { this.bus = bus; this.target = checkNotNull(target); this.method = method; method.setAccessible(true);
this.executor = bus.executor(); }
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
@VisibleForTesting void invokeSubscriberMethod(Object event) throws InvocationTargetException { try { method.invoke(target, checkNotNull(event)); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } } } abstract class Dispatcher {
static Dispatcher perThreadDispatchQueue() { return new PerThreadQueuedDispatcher(); }
static Dispatcher legacyAsync() { return new LegacyAsyncDispatcher(); }
static Dispatcher immediate() { return ImmediateDispatcher.INSTANCE; }
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
private static final class PerThreadQueuedDispatcher extends Dispatcher {
private final ThreadLocal<Queue<Event>> queue = new ThreadLocal<Queue<Event>>() { @Override protected Queue<Event> initialValue() { return Queues.newArrayDeque(); } }; private final ThreadLocal<Boolean> dispatching = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { return false; } };
@Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } } }
| public class OtherSubscriber { @Subscribe public void printStatus(Integer status){ System.out.println("subscriber success + status : " + status); } }
public class StatusSubscriber { @Subscribe public void printStatus(String status){ System.out.println("subscriber success + status : " + status); } }
public static void main(String[] args) { EventBus eventBus = new EventBus(); eventBus.register(new StatusSubscriber()); eventBus.register(new OtherSubscriber());
eventBus.post("ssss"); eventBus.post(1);
System.out.println("end~~~~~~"); }
- spring框架实现
| public abstract class ApplicationEvent extends EventObject { private static final long serialVersionUID = 7099057708183571937L;
private final long timestamp;
public ApplicationEvent(Object source) { super(source); this.timestamp = System.currentTimeMillis(); }
public final long getTimestamp() { return this.timestamp; }
public interface ApplicationEventPublisher { void publishEvent(ApplicationEvent event);
void publishEvent(Object event);
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E event);
| public class EventNotify implements ApplicationContextAware { private static ThreadPoolExecutor executor;
private static ApplicationContext applicationContext;
static { executor = ThreadPoolExecutorFactory.of("ApplicationEventSender",4,10,ThreadPoolExecutorFactory.CALLER_RUNER); }
public static void notifyListener(final ApplicationEvent event){ if(event != null){ executor.submit(() -> applicationContext.publishEvent(event)); } }
@Override public void setApplicationContext(ApplicationContext applicationContext) { EventNotify.applicationContext = applicationContext; } }
EventNotify.notifyListener(new UserRoleChangeEvent(this,userRoleForm.getUid()));
public class UserRoleChangeListener { @Autowired private UserRoleChangeHandler userRoleChangeHandler;
@EventListener(classes = UserRoleChangeEvent.class) public void receiveEvent(final UserRoleChangeEvent event){ log.info("【UserRoleChange】event:{}", JSON.toJSONString(event)); userRoleChangeHandler.rolechange(event.getRoleId()); } }