Motan RPC 框架分析

一、框架简介

Motan 是新浪微博开源的一套高性能、易于使用的分布式远程服务调用(RPC)框架。

Motan 的核心模块交互关系如下:

二、核心模块介绍

2.0 SPI 机制

SPI 机制支持 JDK 的 ServiceProvider 机制并进行了扩展,接口的实现放在 META-INF/services/ 目录下以接口的完全类名命名的文件里,每个实现的完全类名占一行。

每个实现类可以加上注解 @SpiMeta(name="implName") 来指定实现名称,ExtensionLoader 可以通过接口类型和命名从多个实现中找到指定实现。

2.1 register

用来和注册中心交互,包括服务注册、订阅服务、接收服务变更通知、发送心跳等功能。

// 服务发现功能的抽象
public interface DiscoveryService {
    // 订阅到注册中心
    void subscribe(URL url, NotifyListener listener);

    // 取消订阅
    void unsubscribe(URL url, NotifyListener listener);

    // 根据 url 描述的服务从注册中心获取该服务的所有提供者的信息
    List<URL> discover(URL url);
}

// 服务注册功能的抽象
public interface RegistryService {
    void register(URL url);
    void unregister(URL url);
    void available(URL url);
    void unavailable(URL url);
    Collection<URL> getRegisteredServiceUrls();
}

// 表示一个注册中心的抽象
@Spi(scope = Scope.SINGLETON)
public interface Registry extends RegistryService, DiscoveryService {
    // 获取该注册中心的描述信息
    URL getUrl();
}

2.2 protocol

用来进行 RPC 服务描述和配置管理,可以通过 Filter 进行扩展、功能增强。

@Spi(scope = Scope.SINGLETON)
public interface Protocol {
    // 暴露服务
    <T> Exporter<T> export(Provider<T> provider, URL url);

    // 调用端收到注册中心通知的服务提供者实例信息 serviceUrl 后,
    // 根据 serviceUrl 创建对指定实例的引用
    <T> Referer<T> refer(Class<T> clz, URL url, URL serviceUrl);

    void destroy();
}

// Filter 用于进行功能扩展
@Spi
public interface Filter {
    Response filter(Caller<?> caller, Request request);
}

// 把 Filter 应用到 protocol 上,进行功能增强
public class ProtocolFilterDecorator implements Protocol {
    private Protocol protocol;

    public ProtocolFilterDecorator(Protocol protocol) {
        this.protocol = protocol;
    }

    @Override
    public <T> Exporter<T> export(Provider<T> provider, URL url) {
        // 对原始的 provider 进行包装增强
        return protocol.export(decorateWithFilter(provider, url), url);
    }

    @Override
    public <T> Referer<T> refer(Class<T> clz, URL url, URL serviceUrl) {
        // 对原始 protocol 生成的 Referer 进行包装增强,这样可以在调用前、后进行处理。
        return decorateWithFilter(protocol.refer(clz, url, serviceUrl), url);
    }

    @Override
    public void destroy() {
        protocol.destroy();
    }

  // 省略其它代码
}

2.3 serialize

此模块负责把请求、响应进行序列化和反序列化。默认采用 Hessian2 。

@Spi(scope=Scope.PROTOTYPE)
public interface Serialization {
    byte[] serialize(Object obj) throws IOException;
    <T> T deserialize(byte[] bytes, Class<T> clz) throws IOException;
    byte[] serializeMulti(Object[] data) throws IOException;
    Object[] deserializeMulti(byte[] data, Class<?>[] classes) throws IOException;

    /**
     * serializaion的唯一编号,用于传输协议中指定序列化方式。每种序列化的编号必须唯一。
     * @return 由于编码规范限制,序列化方式最大支持32种,因此返回值必须在0-31之间。
     */
    int getSerializationNumber();
}

2.4 transport

实现远程通信,默认采用 Netty NIO 的 TCP 长链接实现。

2.5 cluster

仅供 client 端使用的模块。

Referer 在初始化的时候会让注册中心监听要调用的服务地址,监听的回调处理者是 ClusterSupport,当服务提供者的列表变化时,注册中心通知 ClusterSupport

ClusterSupport 收到注册中心的通知,根据这些服务提供者的信息利用 Protocol 实现生成一组 Referer 对象,然后用这组 Referer 通知 Cluster,Cluster 再用这些 Referer 刷新持有的 LoadBalance 。

构建 Referer 的时候会实例化 Client,以完成远程调用。Client 位于传输层,完成请求、响应的编码、解码工作,然后进行序列化、反序列化,再通过 Channel 进行网络传输。

2.6 Filter

Motan 提供 Filter 机制来进行功能增强,默认支持访问日志、访问统计、接口并发控制、服务开关、服务 Mock、服务降级控制等。

@Spi
public interface Filter {

    Response filter(Caller<?> caller, Request request);
}

在 Filter 的具体实现里,可以根据 CallerProvider/Referer 的实现来决定执行客户端还是服务端的逻辑。

二、构建客户端代理

创建服务的代理的过程:
1. 收集服务的描述信息 refUrl ;
2. 根据 refUrl 和 registryUrl 创建 ClusterSupport;
3. 根据 interfaceClass 和 ClusterSupport.getCluster() 创建代理对象;
4. 以代理对象 RefererInvocationHandler 为例,它持有 Cluster 就可以进行远程调用。

public class DefaultRpcReferer<T> extends AbstractReferer<T> {
    protected Client client;
    protected EndpointFactory endpointFactory;

    public DefaultRpcReferer(Class<T> clz, URL url, URL serviceUrl) {
        super(clz, url, serviceUrl);

        endpointFactory =
                ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
                        url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));

        // 创建 Client
        client = endpointFactory.createClient(url);
    }

    @Override
    protected Response doCall(Request request) {
        try {
            // 为了能够实现跨group请求,需要使用server端的group。
            request.setAttachment(URLParamType.group.getName(), serviceUrl.getGroup());

            // 通过 Client 完成远程调用
            return client.request(request);
        } catch (TransportException exception) {
            throw new MotanServiceException("DefaultRpcReferer call Error: url=" + url.getUri(), exception);
        }
    }

    // 省略其他代码   
}

三、服务端暴露服务

服务端对外暴露服务要考虑两点:

  1. 一个网络端口有多个服务提供者,如何找到目标提供者?
    在transport层实现,ProviderMessageRouter 相当于一个请求分发器,把请求分发调用目前提供者。分发是根据服务描述 URL 里的关键信息生成 serviceKey (组成: group + "/" + interfaceName + "/" + version)来确定提供者,再根据请求里的方法名、方法参数描述来定位要调用的 Method 。
public class ProviderMessageRouter implements MessageHandler {
    // serviceKey 到具体服务提供者的映射
    protected Map<String, Provider<?>> providers = new HashMap<>();

    public Object handle(Channel channel, Object message) {
        Request request = (Request) message;

        String serviceKey = MotanFrameworkUtil.getServiceKey(request);

        Provider<?> provider = providers.get(serviceKey);

        Method method = provider.lookupMethod(request.getMethodName(), request.getParamtersDesc());
        fillParamDesc(request, method);
        processLazyDeserialize(request, method);
        return call(request, provider);
    }

    protected Response call(Request request, Provider<?> provider) {
        try {
            return provider.call(request);
        } catch (Exception e) {
            DefaultResponse response = new DefaultResponse();
            response.setException(new MotanBizException("provider call process error", e));
            return response;
        }
    }

    public synchronized void addProvider(Provider<?> provider) {
        String serviceKey = MotanFrameworkUtil.getServiceKey(provider.getUrl());
        if (providers.containsKey(serviceKey)) {
            throw new MotanFrameworkException("provider alread exist: " + serviceKey);
        }

        providers.put(serviceKey, provider);
    }
}
  1. 有了请求分发处理器后,就可以作为 Server 的消息处理器。
public class DefaultRpcExporter<T> extends AbstractExporter<T> {
    protected final ConcurrentHashMap<String, ProviderMessageRouter> ipPort2RequestRouter;
    protected final ConcurrentHashMap<String, Exporter<?>> exporterMap;
    protected Server server;
    protected EndpointFactory endpointFactory;

    public DefaultRpcExporter(Provider<T> provider, URL url, ConcurrentHashMap<String, ProviderMessageRouter> ipPort2RequestRouter,
                              ConcurrentHashMap<String, Exporter<?>> exporterMap) {
        super(provider, url);
        this.exporterMap = exporterMap;
        this.ipPort2RequestRouter = ipPort2RequestRouter;

        // 初始化消息分发处理器
        ProviderMessageRouter requestRouter = initRequestRouter(url);
        endpointFactory =
                ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
                        url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));

        // 创建 Server
        server = endpointFactory.createServer(url, requestRouter);
    }
}


public abstract class AbstractEndpointFactory implements EndpointFactory {
    /** 维持share channel 的service列表 **/
    protected Map<String, Server> ipPort2ServerShareChannel = new HashMap<String, Server>();
    protected ConcurrentMap<Server, Set<String>> server2UrlsShareChannel = new ConcurrentHashMap<Server, Set<String>>();

    private EndpointManager heartbeatClientEndpointManager = null;

    public AbstractEndpointFactory() {
        // 心跳管理
        heartbeatClientEndpointManager = new HeartbeatClientEndpointManager();
        heartbeatClientEndpointManager.init();
    }

    public Server createServer(URL url, MessageHandler messageHandler) {
        messageHandler = getHeartbeatFactory(url).wrapMessageHandler(messageHandler);

        synchronized (ipPort2ServerShareChannel) {
            String ipPort = url.getServerPortStr();
            String protocolKey = MotanFrameworkUtil.getProtocolKey(url);

            boolean shareChannel =
                    url.getBooleanParameter(URLParamType.shareChannel.getName(), URLParamType.shareChannel.getBooleanValue());

            if (!shareChannel) { // 独享一个端口
                LoggerUtil.info(this.getClass().getSimpleName() + " create no_share_channel server: url={}", url);

                // 如果端口已经被使用了,使用该server bind 会有异常
                return innerCreateServer(url, messageHandler);
            }

            Server server = ipPort2ServerShareChannel.get(ipPort);

            if (server != null) {
                // 省略:无法共享 channel 的抛出异常。

                saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);

                // 共享已经存在的 Server
                return server;
            }

            url = url.createCopy();
            url.setPath(""); // 共享server端口,由于有多个interfaces存在,所以把path设置为空

            // ipPort 上还没创建过 Server,创建一个新的
            server = innerCreateServer(url, messageHandler);

            ipPort2ServerShareChannel.put(ipPort, server);
            saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);

            return server;
        }
    }
}

四、调用过程实现

  1. Cluster 收到调用请求后,通过自身的 HA 策略进行调用;

  2. HA 策略实现根据传入 LB 获取到一个 Referer 实例进行调用;

  3. 根据 Referer 的调用结果来执行 HA 策略。

// ClusterSpi.java
public Response call(Request request) {
    if (available.get()) {
        try {
            return haStrategy.call(request, loadBalance);
        } catch (Exception e) {
            return callFalse(request, e);
        }
    }
    return callFalse(request, new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND));
}

@SpiMeta(name = "failfast")
public class FailfastHaStrategy<T> extends AbstractHaStrategy<T> {
    @Override
    public Response call(Request request, LoadBalance<T> loadBalance) {
        Referer<T> refer = loadBalance.select(request);
        return refer.call(request);
    }
}

 4. Referer 通过持有的 Client 进行调用;

 5. Client 调用时从 Channel 连接池获取到一个 Channel 后再进行写入请求;

 6. Channel 首先给请求生成一个 ResponseFuture,注册到 Client 的 callbackMap 上,这样可以阻塞住调用线程,实现同步调用,也方便收到响应时找到对应的调用线程;

 7. Channel 对于写入的请求根据 Client 在 Netty pipeline 上设置的 codec 组件进行编码、解码操作;

 8. codec 会用 Serialization 组件进行序列化、反序列化;

 9. 最终把序列化后的请求写入网络连接。

 10. 服务端的网络监听接收到请求后,用 codec 组件进行反序列化、解码得到一个请求对象;

 11. Server 的消息处理器 ProviderMessageRouter.handle(Channel channel, Object message) 方法里,根据请求信息得到要调用的 serviceKey, 从而找到对应的 provider ;

 12. 再根据要调用的方法名、参数信息找到对应的实现方法,进行调用,得到响应;

 13. 再把响应写回到网络连接里;

 14. 客户端最终得到响应对象后,从 Client 维护的 callbackMap 找出请求 的 ResponseFuture 并设置结果,使调用线程可以返回。


欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据