motan服务端

前引

新年回来上班,记录下服务端的处理过程,就当热个身

服务端的处理也有套路,不管上层怎么玩,最后还得是通过反射得到Method对象,再调用invoke()

image
根据这张序列图,可以把服务端分为两部分

  1. NettyServer前面的算一部分,搭基础构建Exporter对象
  2. nettyserver后面的算一部分,找到对应method,invoke,通过网络返回

构建Exporter对象

结合spring

其实在《motan客户端》时有提过,但没有深究;spring扩展自定义xml是个很老的技术了
spring扩展xml文档

spring通过XML解析程序将其解析为DOM树,通过NamespaceHandler指定对应的Namespace的BeanDefinitionParser将其转换成BeanDefinition。再通过Spring自身的功能对BeanDefinition实例化对象。

在期间,Spring还会加载两项资料:

  1. META-INF/spring.handlers

指定NamespaceHandler(实现org.springframework.beans.factory.xml.NamespaceHandler)接口,或使用org.springframework.beans.factory.xml.NamespaceHandlerSupport的子类。

  1. META-INF/spring.schemas

在解析XML文件时将XSD重定向到本地文件,避免在解析XML文件时需要上网下载XSD文件。通过现实org.xml.sax.EntityResolver接口来实现该功能。

ConfigHandler

解析完xml后,服务器通过ServiceConfigBean来监听spring容器加载完成。

1
2
3
4
5
6
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!getExported().get()) {
export();
}
}

调用export(),构建Exporter;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public synchronized void export() {
if (exported.get()) {
LoggerUtil.warn(String.format("%s has already been expoted, so ignore the export request!", interfaceClass.getName()));
return;
}

checkInterfaceAndMethods(interfaceClass, methods);

List<URL> registryUrls = loadRegistryUrls();
if (registryUrls == null || registryUrls.size() == 0) {
throw new IllegalStateException("Should set registry config for service:" + interfaceClass.getName());
}

Map<String, Integer> protocolPorts = getProtocolAndPort();
for (ProtocolConfig protocolConfig : protocols) {
Integer port = protocolPorts.get(protocolConfig.getId());
if (port == null) {
throw new MotanServiceException(String.format("Unknow port in service:%s, protocol:%s", interfaceClass.getName(),
protocolConfig.getId()));
}
doExport(protocolConfig, port, registryUrls);
}

afterExport();
}

这个方法主要就做了两件事

  1. loadRegistryUrls(),根据motan:registry,生成URL对象。URL也算是整个框架的核心,一个url包含了配置中的所有内容。就像一个领域对象一样。

如果使用的是zookeeper,对就的URL就是:zookeeper://127.0.0.1:2181/com.weibo.api.motan.registry.RegistryService?group=default_rpc

这是注册中心的URL

而服务URL是以参数为embed为key包含在里面,里面包含了所有的服务参数

motan://127.0.0.1:8002/com.share.rpc.service.DEMOService?module=match-rpc&loadbalance=activeWeight&nodeType=service&accessLog=true&minWorkerThread=2&protocol=motan&isDefault=true&maxWorkerThread=10&refreshTimestamp=1486448597095&id=com.weibo.api.motan.config.springsupport.ServiceConfigBean&export=protocolMatch:8002&requestTimeout=60000&group=match-rpc&

  1. doExport()
    1
    2
    ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);
    exporters.add(configHandler.export(interfaceClass, ref, urls));

到这儿就是委托给ConfigHandler处理了。

默认的实现类:SimpleConfigHandler

这个类里面有两个重要的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String proxyType) {
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
return proxyFactory.getProxy(interfaceClass, new RefererInvocationHandler<T>(interfaceClass, clusters));
}

@Override
public <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls) {

String serviceStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
URL serviceUrl = URL.valueOf(serviceStr);

// export service
// 利用protocol decorator来增加filter特性
String protocolName = serviceUrl.getParameter(URLParamType.protocol.getName(), URLParamType.protocol.getValue());
Protocol protocol = new ProtocolFilterDecorator(ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(protocolName));
Provider<T> provider = new DefaultProvider<T>(ref, serviceUrl, interfaceClass);
Exporter<T> exporter = protocol.export(provider, serviceUrl);

// register service
register(registryUrls, serviceUrl);

return exporter;
}

refer()就是RefererConfig配置完后的调用的方法,就看到了客户端的核心类RefererInvocationHandler

export()就是服务端使用的方法了。

构造provider

1
2
3
4
public interface Provider<T> extends Caller<T> {

Class<T> getInterface();
}

其实就是外面服务类的代理类,里面就一个服务类的引用。在初始化时,把所有的方法全部缓存

1
2
3
4
5
6
7
8
private void initMethodMap(Class<T> clz) {
Method[] methods = clz.getMethods();

for (Method method : methods) {
String methodDesc = ReflectUtil.getMethodDesc(method);
methodMap.put(methodDesc, method);
}
}

Protocol的export就是启动一个netty server,提供对外服务。

register 注册服务

按上面export(),先启动一个netty server,服务一切就绪后,再向注册中心进行注册了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void register(List<URL> registryUrls, URL serviceUrl) {

for (URL url : registryUrls) {
// 根据check参数的设置,register失败可能会抛异常,上层应该知晓
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(url.getProtocol());
if (registryFactory == null) {
throw new MotanFrameworkException(new MotanErrorMsg(500, MotanErrorMsgConstant.FRAMEWORK_REGISTER_ERROR_CODE,
"register error! Could not find extension for registry protocol:" + url.getProtocol()
+ ", make sure registry module for " + url.getProtocol() + " is in classpath!"));
}
Registry registry = registryFactory.getRegistry(url);
registry.register(serviceUrl);
}
}

根据注册协议选择注册中心
RegistryFacotry有三种:1. direct 2.local 3.zookeeper

对应的registry也是这三种

在zookeeperregistry里面,

1
2
3
4
5
6
7
8
9
10
11
12
13
protected void doRegister(URL url) {
try {
serverLock.lock();
// 防止旧节点未正常注销
removeNode(url, ZkNodeType.AVAILABLE_SERVER);
removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
} catch (Throwable e) {
throw new MotanFrameworkException(String.format("Failed to register %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e);
} finally {
serverLock.unlock();
}
}

这儿注册后,这个服务还是unavailable,需要调用
MotanSwitcherUtil.setSwitcherValue(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER, true);才能变成available

NettyServer

一样的套路,有一个默认ChannelHandler,其实什么事都不干,只是包装一下自定义Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final NettyChannelHandler handler = new NettyChannelHandler(NettyServer.this, messageHandler,
standardThreadExecutor);

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
// FrameDecoder非线程安全,每个连接一个 Pipeline
public ChannelPipeline getPipeline() {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("channel_manage", channelManage);
pipeline.addLast("decoder", new NettyDecoder(codec, NettyServer.this, maxContentLength));
pipeline.addLast("encoder", new NettyEncoder(codec, NettyServer.this));
pipeline.addLast("handler", handler);
return pipeline;
}
});

这儿有个有意思的MessageHandler:ProviderProtectedMessageRouter
从名字看,有保护功能

1) 如果接口只有一个方法,那么直接return true
2) 如果接口有多个方法,那么如果单个method超过 maxThread / 2 && totalCount > (maxThread 3 / 4),那么return false;
3) 如果接口有多个方法(4个),同时总的请求数超过 maxThread
3 / 4,同时该method的请求数超过 maxThead * 1 / 4, 那么return false
4) 其他场景return true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected boolean isAllowRequest(int requestCounter, int totalCounter, int maxThread, Request request) {
if (methodCounter.get() == 1) {
return true;
}

// 该方法第一次请求,直接return true
if (requestCounter == 1) {
return true;
}

// 不简单判断 requsetCount > (maxThread / 2) ,因为假如有2或者3个method对外提供,
// 但是只有一个接口很大调用量,而其他接口很空闲,那么这个时候允许单个method的极限到 maxThread * 3 / 4
if (requestCounter > (maxThread / 2) && totalCounter > (maxThread * 3 / 4)) {
return false;
}

// 如果总体线程数超过 maxThread * 3 / 4个,并且对外的method比较多,那么意味着这个时候整体压力比较大,
// 那么这个时候如果单method超过 maxThread * 1 / 4,那么reject
return !(methodCounter.get() >= 4 && totalCounter > (maxThread * 3 / 4) && requestCounter > (maxThread * 1 / 4));

}

拒绝的结果就是放一个异常:

1
2
3
4
5
6
7
8
9
10
11
private Response reject(String method, int requestCounter, int totalCounter, int maxThread) {
DefaultResponse response = new DefaultResponse();
MotanServiceException exception =
new MotanServiceException("ThreadProtectedRequestRouter reject request: request_counter=" + requestCounter
+ " total_counter=" + totalCounter + " max_thread=" + maxThread, MotanErrorMsgConstant.SERVICE_REJECT);
exception.setStackTrace(new StackTraceElement[0]);
response.setException(exception);
LoggerUtil.error("ThreadProtectedRequestRouter reject request: request_method=" + method + " request_counter=" + requestCounter
+ " =" + totalCounter + " max_thread=" + maxThread);
return response;
}

other

RpcContext

1
2
3
4
5
6
7
8
9
private static final ThreadLocal<RpcContext> localContext = new ThreadLocal<RpcContext>() {
protected RpcContext initialValue() {
return new RpcContext();
}
};

public static RpcContext getContext() {
return localContext.get();
}

这个ThreadLocal尽然还可以设置默认初始值,以前尽然没用过

总结

服务端相对客户端还是很简单的。

没有ha,loadbalance,就是原生netty就把请求处理完了。

朱兴生 wechat
最新文章尽在微信公众号『码农戏码』