一、框架简介
Motan 是新浪微博开源的一套高性能、易于使用的分布式远程服务调用(RPC)框架。
Motan 的核心模块交互关系如下:
二、核心模块介绍
2.0 SPI 机制
SPI 机制支持 JDK 的 ServiceProvider 机制并进行了扩展,接口的实现放在 META-INF/services/
目录下以接口的完全类名命名的文件里,每个实现的完全类名占一行。
每个实现类可以加上注解 @SpiMeta(name="implName")
来指定实现名称,ExtensionLoader
可以通过接口类型和命名从多个实现中找到指定实现。
2.1 register
用来和注册中心交互,包括服务注册、订阅服务、接收服务变更通知、发送心跳等功能。
// 服务发现功能的抽象
public interface DiscoveryService {
// 订阅到注册中心
void subscribe(URL url, NotifyListener listener);
// 取消订阅
void unsubscribe(URL url, NotifyListener listener);
// 根据 url 描述的服务从注册中心获取该服务的所有提供者的信息
List<URL> discover(URL url);
}
// 服务注册功能的抽象
public interface RegistryService {
void register(URL url);
void unregister(URL url);
void available(URL url);
void unavailable(URL url);
Collection<URL> getRegisteredServiceUrls();
}
// 表示一个注册中心的抽象
@Spi(scope = Scope.SINGLETON)
public interface Registry extends RegistryService, DiscoveryService {
// 获取该注册中心的描述信息
URL getUrl();
}
2.2 protocol
用来进行 RPC 服务描述和配置管理,可以通过 Filter 进行扩展、功能增强。
@Spi(scope = Scope.SINGLETON)
public interface Protocol {
// 暴露服务
<T> Exporter<T> export(Provider<T> provider, URL url);
// 调用端收到注册中心通知的服务提供者实例信息 serviceUrl 后,
// 根据 serviceUrl 创建对指定实例的引用
<T> Referer<T> refer(Class<T> clz, URL url, URL serviceUrl);
void destroy();
}
// Filter 用于进行功能扩展
@Spi
public interface Filter {
Response filter(Caller<?> caller, Request request);
}
// 把 Filter 应用到 protocol 上,进行功能增强
public class ProtocolFilterDecorator implements Protocol {
private Protocol protocol;
public ProtocolFilterDecorator(Protocol protocol) {
this.protocol = protocol;
}
@Override
public <T> Exporter<T> export(Provider<T> provider, URL url) {
// 对原始的 provider 进行包装增强
return protocol.export(decorateWithFilter(provider, url), url);
}
@Override
public <T> Referer<T> refer(Class<T> clz, URL url, URL serviceUrl) {
// 对原始 protocol 生成的 Referer 进行包装增强,这样可以在调用前、后进行处理。
return decorateWithFilter(protocol.refer(clz, url, serviceUrl), url);
}
@Override
public void destroy() {
protocol.destroy();
}
// 省略其它代码
}
2.3 serialize
此模块负责把请求、响应进行序列化和反序列化。默认采用 Hessian2 。
@Spi(scope=Scope.PROTOTYPE)
public interface Serialization {
byte[] serialize(Object obj) throws IOException;
<T> T deserialize(byte[] bytes, Class<T> clz) throws IOException;
byte[] serializeMulti(Object[] data) throws IOException;
Object[] deserializeMulti(byte[] data, Class<?>[] classes) throws IOException;
/**
* serializaion的唯一编号,用于传输协议中指定序列化方式。每种序列化的编号必须唯一。
* @return 由于编码规范限制,序列化方式最大支持32种,因此返回值必须在0-31之间。
*/
int getSerializationNumber();
}
2.4 transport
实现远程通信,默认采用 Netty NIO 的 TCP 长链接实现。
2.5 cluster
仅供 client 端使用的模块。
Referer
在初始化的时候会让注册中心监听要调用的服务地址,监听的回调处理者是 ClusterSupport
,当服务提供者的列表变化时,注册中心通知 ClusterSupport
。
ClusterSupport
收到注册中心的通知,根据这些服务提供者的信息利用 Protocol
实现生成一组 Referer
对象,然后用这组 Referer 通知 Cluster,Cluster 再用这些 Referer 刷新持有的 LoadBalance 。
构建 Referer
的时候会实例化 Client
,以完成远程调用。Client
位于传输层,完成请求、响应的编码、解码工作,然后进行序列化、反序列化,再通过 Channel
进行网络传输。
2.6 Filter
Motan 提供 Filter 机制来进行功能增强,默认支持访问日志、访问统计、接口并发控制、服务开关、服务 Mock、服务降级控制等。
@Spi
public interface Filter {
Response filter(Caller<?> caller, Request request);
}
在 Filter 的具体实现里,可以根据 Caller
是 Provider/Referer
的实现来决定执行客户端还是服务端的逻辑。
二、构建客户端代理
创建服务的代理的过程:
1. 收集服务的描述信息 refUrl ;
2. 根据 refUrl 和 registryUrl 创建 ClusterSupport;
3. 根据 interfaceClass 和 ClusterSupport.getCluster() 创建代理对象;
4. 以代理对象 RefererInvocationHandler 为例,它持有 Cluster 就可以进行远程调用。
public class DefaultRpcReferer<T> extends AbstractReferer<T> {
protected Client client;
protected EndpointFactory endpointFactory;
public DefaultRpcReferer(Class<T> clz, URL url, URL serviceUrl) {
super(clz, url, serviceUrl);
endpointFactory =
ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));
// 创建 Client
client = endpointFactory.createClient(url);
}
@Override
protected Response doCall(Request request) {
try {
// 为了能够实现跨group请求,需要使用server端的group。
request.setAttachment(URLParamType.group.getName(), serviceUrl.getGroup());
// 通过 Client 完成远程调用
return client.request(request);
} catch (TransportException exception) {
throw new MotanServiceException("DefaultRpcReferer call Error: url=" + url.getUri(), exception);
}
}
// 省略其他代码
}
三、服务端暴露服务
服务端对外暴露服务要考虑两点:
- 一个网络端口有多个服务提供者,如何找到目标提供者?
在transport层实现,ProviderMessageRouter
相当于一个请求分发器,把请求分发调用目前提供者。分发是根据服务描述 URL 里的关键信息生成 serviceKey (组成:group + "/" + interfaceName + "/" + version
)来确定提供者,再根据请求里的方法名、方法参数描述来定位要调用的 Method 。
public class ProviderMessageRouter implements MessageHandler {
// serviceKey 到具体服务提供者的映射
protected Map<String, Provider<?>> providers = new HashMap<>();
public Object handle(Channel channel, Object message) {
Request request = (Request) message;
String serviceKey = MotanFrameworkUtil.getServiceKey(request);
Provider<?> provider = providers.get(serviceKey);
Method method = provider.lookupMethod(request.getMethodName(), request.getParamtersDesc());
fillParamDesc(request, method);
processLazyDeserialize(request, method);
return call(request, provider);
}
protected Response call(Request request, Provider<?> provider) {
try {
return provider.call(request);
} catch (Exception e) {
DefaultResponse response = new DefaultResponse();
response.setException(new MotanBizException("provider call process error", e));
return response;
}
}
public synchronized void addProvider(Provider<?> provider) {
String serviceKey = MotanFrameworkUtil.getServiceKey(provider.getUrl());
if (providers.containsKey(serviceKey)) {
throw new MotanFrameworkException("provider alread exist: " + serviceKey);
}
providers.put(serviceKey, provider);
}
}
- 有了请求分发处理器后,就可以作为 Server 的消息处理器。
public class DefaultRpcExporter<T> extends AbstractExporter<T> {
protected final ConcurrentHashMap<String, ProviderMessageRouter> ipPort2RequestRouter;
protected final ConcurrentHashMap<String, Exporter<?>> exporterMap;
protected Server server;
protected EndpointFactory endpointFactory;
public DefaultRpcExporter(Provider<T> provider, URL url, ConcurrentHashMap<String, ProviderMessageRouter> ipPort2RequestRouter,
ConcurrentHashMap<String, Exporter<?>> exporterMap) {
super(provider, url);
this.exporterMap = exporterMap;
this.ipPort2RequestRouter = ipPort2RequestRouter;
// 初始化消息分发处理器
ProviderMessageRouter requestRouter = initRequestRouter(url);
endpointFactory =
ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));
// 创建 Server
server = endpointFactory.createServer(url, requestRouter);
}
}
public abstract class AbstractEndpointFactory implements EndpointFactory {
/** 维持share channel 的service列表 **/
protected Map<String, Server> ipPort2ServerShareChannel = new HashMap<String, Server>();
protected ConcurrentMap<Server, Set<String>> server2UrlsShareChannel = new ConcurrentHashMap<Server, Set<String>>();
private EndpointManager heartbeatClientEndpointManager = null;
public AbstractEndpointFactory() {
// 心跳管理
heartbeatClientEndpointManager = new HeartbeatClientEndpointManager();
heartbeatClientEndpointManager.init();
}
public Server createServer(URL url, MessageHandler messageHandler) {
messageHandler = getHeartbeatFactory(url).wrapMessageHandler(messageHandler);
synchronized (ipPort2ServerShareChannel) {
String ipPort = url.getServerPortStr();
String protocolKey = MotanFrameworkUtil.getProtocolKey(url);
boolean shareChannel =
url.getBooleanParameter(URLParamType.shareChannel.getName(), URLParamType.shareChannel.getBooleanValue());
if (!shareChannel) { // 独享一个端口
LoggerUtil.info(this.getClass().getSimpleName() + " create no_share_channel server: url={}", url);
// 如果端口已经被使用了,使用该server bind 会有异常
return innerCreateServer(url, messageHandler);
}
Server server = ipPort2ServerShareChannel.get(ipPort);
if (server != null) {
// 省略:无法共享 channel 的抛出异常。
saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);
// 共享已经存在的 Server
return server;
}
url = url.createCopy();
url.setPath(""); // 共享server端口,由于有多个interfaces存在,所以把path设置为空
// ipPort 上还没创建过 Server,创建一个新的
server = innerCreateServer(url, messageHandler);
ipPort2ServerShareChannel.put(ipPort, server);
saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);
return server;
}
}
}
四、调用过程实现
-
Cluster 收到调用请求后,通过自身的 HA 策略进行调用;
-
HA 策略实现根据传入 LB 获取到一个 Referer 实例进行调用;
-
根据 Referer 的调用结果来执行 HA 策略。
// ClusterSpi.java
public Response call(Request request) {
if (available.get()) {
try {
return haStrategy.call(request, loadBalance);
} catch (Exception e) {
return callFalse(request, e);
}
}
return callFalse(request, new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND));
}
@SpiMeta(name = "failfast")
public class FailfastHaStrategy<T> extends AbstractHaStrategy<T> {
@Override
public Response call(Request request, LoadBalance<T> loadBalance) {
Referer<T> refer = loadBalance.select(request);
return refer.call(request);
}
}
4. Referer 通过持有的 Client 进行调用;
5. Client 调用时从 Channel 连接池获取到一个 Channel 后再进行写入请求;
6. Channel 首先给请求生成一个 ResponseFuture,注册到 Client 的 callbackMap 上,这样可以阻塞住调用线程,实现同步调用,也方便收到响应时找到对应的调用线程;
7. Channel 对于写入的请求根据 Client 在 Netty pipeline 上设置的 codec 组件进行编码、解码操作;
8. codec 会用 Serialization 组件进行序列化、反序列化;
9. 最终把序列化后的请求写入网络连接。
10. 服务端的网络监听接收到请求后,用 codec 组件进行反序列化、解码得到一个请求对象;
11. Server 的消息处理器 ProviderMessageRouter.handle(Channel channel, Object message)
方法里,根据请求信息得到要调用的 serviceKey, 从而找到对应的 provider ;
12. 再根据要调用的方法名、参数信息找到对应的实现方法,进行调用,得到响应;
13. 再把响应写回到网络连接里;
14. 客户端最终得到响应对象后,从 Client 维护的 callbackMap 找出请求 的 ResponseFuture 并设置结果,使调用线程可以返回。
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。