徒手撸一个简易消息总线EventBus
平时在写Android或者Java进程内应用时,发送消息一般会选择Guava的EventBus,这样可以做到代码松耦合,业务解耦。今天半支烟简单分析下消息总线原理,然后写个简易的消息总线。说白了,消息总线其实就是个观察者模式的典型应用。
EventBus的一般使用
流程图和大致原理
消息发布者(或者叫被观察者
),发送了某个事件消息,EventBus会根据Subcriber注解以及Event事件,找出合适的订阅者(或者叫观察者
),然后通过Java反射执行订阅者的响应方法。
使用步骤
一般我们在项目用使用分以下几步,具体使用小伙伴们可自行谷歌:
- 定义一个事件类
public class AnyEventType {
public AnyEventType(){}
}
- 注册观察者
EventBus.getDefault().register(this);
- 订阅观察者的方法
@Subscribe
public void onEvent(AnyEventType event) {/* Do something */};
- 被观察者发送消息
EventBus.getDefault().post(event);
分析EventBus&设计核心类
上面介绍了EventBus的基本用法,下面开始分析EventBus的核心类。
从上面的使用方式来看,EventBus共有2个核心方法,register() 和 post() 。弄懂了他们的使用方法,也就基本明白了:register()是注册订阅者,post()是发布者发送消息。现在应该明白了EventBus的核心了吧。下面用2张图来描述下这2个核心方法:
代码实现
开始实现5个核心类:EventBus、AsyncEventBus、Subcribe、ObserverAction、ObserverRegistry。
1.Subscribe
Subscribe是一个注解,用来标识观察者的哪些方法可以接受消息。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {}
2.ObserverAction
ObserverAction类用来执行观察者的方法,@Subscribe注解的方法,会在ObserverRegistry类中进行统一注册,target表示观察者类,method表示被执行的方法。
public class ObserverAction {
private Object target;
private Method method;
public ObserverAction(Object target, Method method) {
this.target = target;
this.method = method;
this.method.setAccessible(true);
}
public void execute(Object event) {
try {
method.invoke(target, event);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
}
}
3.ObserverRegistry
ObserverRegistry类用来注册观察者,使用了大量Java反射,核心逻辑包括:
1、将@Subscribe注解的事件和方法,统一注册到map中
2、提供根据事件找出对应的类方法
避免并发冲突,所以使用了ConcurrentHashMap 和 CopyOnWriteArraySet。注册单个观察者的时候,也是先取出观察者的事件和方法,然后再统一放到全局并发容器中。这样逻辑清晰,也避免了并发冲突。
public class ObserverRegistry {
private ConcurrentHashMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry = new ConcurrentHashMap<>();
/**
* 注册
*/
public void register(Object observer) {
//遍历带有注解的方法,将事件和对应的多个处理方法,存储到map中
Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);
//将获取到的单个观察者的可执行方法,放到如全局的map中,使用并发类
for (Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {
Class<?> eventType = entry.getKey();
Collection<ObserverAction> eventActions = entry.getValue();
CopyOnWriteArraySet<ObserverAction> registeredEventActions = registry.get(eventType);
if (registeredEventActions == null) {
registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>());
registeredEventActions = registry.get(eventType);
}
registeredEventActions.addAll(eventActions);
}
}
/**
* 遍历带有注解的方法,将事件和对应的多个处理方法,存储到map中
*
* @param observer
* @return
*/
private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {
Class<?> clazz = observer.getClass();
List<Method> methodList = getAnnotateMethods(clazz);
Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();
for (Method method : methodList) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
if (!observerActions.containsKey(eventType)) {
observerActions.put(eventType, new ArrayList<>());
}
observerActions.get(eventType).add(new ObserverAction(observer, method));
}
return observerActions;
}
/**
* 获取观察者中含有注解的方法
*
* @param clazz
* @return
*/
private List<Method> getAnnotateMethods(Class<?> clazz) {
List<Method> annotateMethods = new ArrayList<>();
for (Method method : clazz.getDeclaredMethods()) {
if (method.isAnnotationPresent(Subscribe.class)) {
// Class<?>[] parameterTypes = method.getParameterTypes();
annotateMethods.add(method);
}
}
return annotateMethods;
}
/**
* 根据事件获取合适的观察者方法
*
* @param event
* @return
*/
public List<ObserverAction> getMatchedObserverActions(Object event) {
List<ObserverAction> matchedObservers = new ArrayList<>();
Class<?> postedEventType = event.getClass();
for (Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) {
Class<?> eventType = entry.getKey();
Collection<ObserverAction> eventActions = entry.getValue();
//判断有入参的事件,是否是容器里的时间的子类,可以说是一个类是否可以被强制转换为另外一个实例对象
//父类 和 子类,判断都会为true
if (eventType.isAssignableFrom(postedEventType)) {
matchedObservers.addAll(eventActions);
}
}
return matchedObservers;
}
}
4.EventBus
消息总线入口方法,提供了register注册观察者,也提供了post让被观察者发送消息。
public class EventBus {
private ObserverRegistry registry = new ObserverRegistry();
private Executor executor;
public EventBus() {
}
public EventBus(Executor executor) {
this.executor = executor;
}
/**
* 注册观察者
*/
public void register(Object observer) {
registry.register(observer);
}
/**
* 发布者-发送消息
*/
public void post(Object event) {
List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);
for (ObserverAction observerAction : observerActions) {
if (executor == null) {
observerAction.execute(event);
} else {
executor.execute(() -> {
observerAction.execute(event);
});
}
}
}
}
5.AsyncEventBus
异步消息总线入口方法
public class AsyncEventBus extends EventBus {
public AsyncEventBus(Executor executor) {
super(executor);
}
}
整理代码
总结
总体来说,EventBus是基于观察者模式实现的消息总线,实现代码和业务的解耦。
框架的作用是:隐藏实现细节、降低开发难度、实现代码复用,解耦业务代码,让开发人员聚焦业务开发。
从图中可以看出,最关键的数据结构是Observer注册表,记录了消息类型和可接受函数的对应关系。当调用 register() 函数注册观察者的时候,EventBus 通过解析 @Subscribe 注解,生成 Observer 注册表。当调用 post() 函数发送消息的时候,EventBus 通过注册表找到相应的可接收消息的函数,然后通过 Java 的反射语法来动态地创建对象、执行函数。对于同步阻塞模式,EventBus 在一个线程内依次执行相应的函数。对于异步非阻塞模式,EventBus 通过一个线程池来执行相应的函数。
弄懂了原理,就开始代码实现了。实现包括 5 个类:EventBus、AsyncEventBus、Subscribe、ObserverAction、ObserverRegistry。