RPC的本质 方法调用对于程序员来讲是再正常不过的事了,object.method(),RPC的使用也一样,但底层对这一过程又切分开,有client和server两端,也就是调用者与实现者
因为他们不再在同一进程中,需要通过网络跨JVM实现这一调用过程
在java中的实现手法:动态代理+socket通信;这就是个套路,上层怎么封装实现,但底层就是这样,概莫能外
请求过程 motan的调用实现 先画个简单的序列图,理清一下调用过程
motan与spring的结合,后面再写了,spring的扩展也很简单。
基于对RPC本质的认识,可以先找到InvocationHandler的实现类RefererInvocationHandler
这个接口就一个方法
1 2 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable;
在这个方法里面就是去构造socket传输的request对象,request主要就是方法的签名信息与参数,传输到server端,去执行对应的实现方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if(isLocalMethod(method)){ if("toString".equals(method.getName())){ return clustersToString(); } throw new MotanServiceException("can not invoke local method:" + method.getName()); } DefaultRequest request = new DefaultRequest(); request.setRequestId(RequestIdGenerator.getRequestId()); request.setArguments(args); request.setMethodName(method.getName()); request.setParamtersDesc(ReflectUtil.getMethodParamDesc(method)); request.setInterfaceName(clz.getName()); request.setAttachment(URLParamType.requestIdFromClient.getName(), String.valueOf(RequestIdGenerator.getRequestIdFromClient())); // 当 referer配置多个protocol的时候,比如A,B,C, // 那么正常情况下只会使用A,如果A被开关降级,那么就会使用B,B也被降级,那么会使用C for (Cluster<T> cluster : clusters) { String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol(); Switcher switcher = switcherService.getSwitcher(protocolSwitcher); if (switcher != null && !switcher.isOn()) { continue; } request.setAttachment(URLParamType.version.getName(), cluster.getUrl().getVersion()); request.setAttachment(URLParamType.clientGroup.getName(), cluster.getUrl().getGroup()); // 带上client的application和module request.setAttachment(URLParamType.application.getName(), ApplicationInfo.getApplication(cluster.getUrl()).getApplication()); request.setAttachment(URLParamType.module.getName(), ApplicationInfo.getApplication(cluster.getUrl()).getModule()); Response response = null; boolean throwException = Boolean.parseBoolean(cluster.getUrl().getParameter(URLParamType.throwException.getName(), URLParamType.throwException.getValue())); try { //真正执行 response = cluster.call(request); return response.getValue(); }
invoke交给了Cluster.call,而Cluster又给了HAStragy.call,HA策略通过loadbalance选择负载均衡策略得到Referer
Cluster是什么 在InvocationHandler里面,调用了Cluster的call方法,从代码上看,它的本质就是Referer的集合,并且提供了HA服务以及负载均衡。而Referer是提供服务的一个抽象
HA与LoadBalance
HA HA策略,就提供了两种,
fail-fast fail-fast很简单,调用失败就抛异常;
fail-over fail-over相对fail-fast多了重试次数,如果失败,就重试一个referer
LoadBalance 这倒提供了不少
Round-Robin 这个很简单,一个一个往下轮询就行了, 但需要记住上一次的位置
random 随机
Least Load 这个motan实现有点意思
由于Referer List可能很多,比如上百台,如果每次都要从这上百个Referer或者最低并发的几个,性能有些损耗,因此 random.nextInt(list.size())获取一个起始的index,然后获取最多不超过MAX_REFERER_COUNT的 状态是isAvailable的referer进行判断activeCount.
localFirst
本地服务优先获取策略:对referers根据ip顺序查找本地服务,多存在多个本地服务,获取Active最小的本地服务进行服务。当不存在本地服务,但是存在远程RPC服务,则根据ActivWeight获取远程RPC服务;当两者都存在,所有本地服务都应优先于远程服务,本地RPC服务与远程RPC服务内部则根据ActiveWeight进行
NettyClient 上层不管怎么选择服务,最后都需要传输层去传输,nettyclient就是传输作用。
在DefaultRpcReferer中创建了一个nettyClient。向server发送远程调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private Response request(Request request, boolean async) throws TransportException { Channel channel = null; Response response = null; try { // return channel or throw exception(timeout or connection_fail) channel = borrowObject(); if (channel == null) { LoggerUtil.error("NettyClient borrowObject null: url=" + url.getUri() + " " + MotanFrameworkUtil.toString(request)); return null; } // async request response = channel.request(request); // return channel to pool returnObject(channel);
使用了common-pool连接池
在这儿是委托给了nettychannel.request(),nettyclient与nettychannel是什么关系呢? client有server地址,channel就是这个地址连接的通道。 在nettychannel中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public Response request(Request request) throws TransportException { int timeout = nettyClient.getUrl().getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue()); if (timeout <= 0) { throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.", MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR); } NettyResponseFuture response = new NettyResponseFuture(request, timeout, this.nettyClient); this.nettyClient.registerCallback(request.getRequestId(), response); ChannelFuture writeFuture = this.channel.write(request); boolean result = writeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS); if (result && writeFuture.isSuccess()) { response.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) { // 成功的调用 nettyClient.resetErrorCount(); } else { // 失败的调用 nettyClient.incrErrorCount(); } } }); return response; }
到此,整个请求过程已经完成。
返回处理 调用完成之后,总得得到结果才行
motan返回过程 在上面nettychannel.request方法,会返回一个response,NettyResponseFuture这个类名就说明了一切,使用了Future模式。
在返回response时,构造真实response
1 2 3 4 5 6 7 8 private Response asyncResponse(Response response, boolean async) { if (async || !(response instanceof NettyResponseFuture)) { return response; } return new DefaultResponse(response); }
真实response里面,使用futureresponse去取值
1 2 3 4 5 6 7 public DefaultResponse(Response response) { this.value = response.getValue(); this.exception = response.getException(); this.requestId = response.getRequestId(); this.processTime = response.getProcessTime(); this.timeout = response.getTimeout(); }
在futureresponse里面:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public Object getValue() { synchronized (lock) { if (!isDoing()) { return getValueOrThrowable(); } if (timeout <= 0) { try { lock.wait(); } catch (Exception e) { cancel(new MotanServiceException("NettyResponseFuture getValue InterruptedException : " + MotanFrameworkUtil.toString(request) + " cost=" + (System.currentTimeMillis() - createTime), e)); } // don't need to notifylisteners, because onSuccess or // onFailure or cancel method already call notifylisteners return getValueOrThrowable(); } else { long waitTime = timeout - (System.currentTimeMillis() - createTime); if (waitTime > 0) { for (;;) { try { lock.wait(waitTime); } catch (InterruptedException e) { } if (!isDoing()) { break; } else { waitTime = timeout - (System.currentTimeMillis() - createTime); if (waitTime <= 0) { break; } } } } if (isDoing()) { timeoutSoCancel(); } } return getValueOrThrowable(); } }
没有使用java.util.concurrent包中Condition,CountDownLatch之类的工具类,而是使用原始的wait,notify组合
在NettyClient中,得到返回对象后,对responsefuter进行赋值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 pipeline.addLast("handler", new NettyChannelHandler(NettyClient.this, new MessageHandler() { @Override public Object handle(Channel channel, Object message) { //得到返回对象 Response response = (Response) message; //得到对应request的future NettyResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId()); if (responseFuture == null) { LoggerUtil.warn( "NettyClient has response from server, but resonseFuture not exist, requestId={}", response.getRequestId()); return null; } if (response.getException() != null) { responseFuture.onFailure(response); } else { responseFuture.onSuccess(response); } return null; } }));
responseFuture的onsuccess方法,进行赋值并notify
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void onSuccess(Response response) { this.result = response.getValue(); this.processTime = response.getProcessTime(); done(); } private boolean done() { synchronized (lock) { if (!isDoing()) { return false; } state = FutureState.DONE; lock.notifyAll(); } notifyListeners(); return true; }
总结 到此,客户端部分已经完成,主要就是两方面
调用请求
返回处理
还有一些问题:
客户端怎么服务发现的?
服务降低怎么处理的?