微服务超时与重试

前言

其实不只在微服务中,在平常网络请求,或者与第三方系统进行交互都需要设置超时时间

为什么需要超时与重试? 总体上讲,肯定是为了增加系统可靠性,具体表现在两个方面

  1. 系统自我保护:
    快速失败,在业务最大允许等待时间内未收到返回数据,主动放弃等待,释放占用资源,避免请求不断累积带来的客户端雪崩效应
  2. 成功率:服务处理超时原因有很多,但常见的超时都是短暂的,主要是GC,或者有网络抖动等。这些短时间影响服务端状态的情况而造成请求成功率下降,需要补救措施。简单的补救有超时重试操作:当前请求超时后,将会重试到非当前服务器,降低重试超时的机率

这一篇将由浅入深探索timeout机制,以及在微服务下的实践

超时

经常被提起的两种超时:connection timeout、socket timeout

通过最底层的Socket,ServerSocket演示一下这两种超时的表现,nio框架都会有对应的配置选项

connectionTimeout

建立连接超时时间

客户端,随便写个IP,设置一个timeout

1
2
Socket socket = new Socket();
socket.connect(new InetSocketAddress("10.0.0.1",8080),20000);

在timeout时间到时,就会抛出connect timed out异常

1
2
3
4
5
6
7
8
9
Exception in thread "main" java.net.SocketTimeoutException: connect timed out
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)

socketTimeout

Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a read() call on the InputStream associated with this Socket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the Socket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be > 0. A timeout of zero is interpreted as an infinite timeout.

服务端,只要让客户端能连接上就行,不发送数据

1
2
3
4
5
ServerSocket serverSocket = new ServerSocket(8080);
while ( true) {
Socket socket = serverSocket.accept();
new Thread(new P(socket)).start();
}

客户端,进行读数据

1
2
3
Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost",8080),20000);
socket.setSoTimeout(3000);

3s后,就抛出Read timed out

1
2
3
4
5
6
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:223)

nio

对NIO,看网上一些示例基本没有关注到这一点,所以值得思考,难道是nio不需要关注timeout?

客户端对服务器的连接:

1
2
3
4
Selector selector = Selector.open();
InetSocketAddress isa = new InetSocketAddress(host, port);
// 调用open静态方法创建连接到指定主机的SocketChannel
SocketChannel sc = SocketChannel.open(isa);

在调用SocketChannel.open()方法时,如果连接不上服务器,那么此方法就会长时间阻塞,为了解决这个问题,我们可以在调用open()方法前,启动一个定时器,这个定时器会在指定的时间内检查是否已连接成功,这个指定的时间也就是我们希望设置的连接超时时间,当检查已连接上服务器时,提示用户已连接成功;若没有连接上,可在代码中抛出SocketTimeoutException,并提示用户连接超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void connect(){
try{
selector = Selector.open();
InetSocketAddress isa = new InetSocketAddress(host, port);
//10秒连接超时
new Timer().schedule(tt, 10000);
// 调用open静态方法创建连接到指定主机的SocketChannel
sc = SocketChannel.open(isa);
// 设置该sc以非阻塞方式工作
sc.configureBlocking(false);
// 将Socketchannel对象注册到指定Selector
sc.register(selector, SelectionKey.OP_READ);
Message msg = new Message();
msg.what = 0;
msg.obj = sc;
handler.sendMessage(msg); // 连接成功
new Thread(new NIOReceiveThread(selector, handler)).start();
}catch (IOException e) {
e.printStackTrace();
handler.sendEmptyMessage(-1); // IO异常
}
}
TimerTask tt = new TimerTask(){
@Override
public void run(){
if (sc == null || !sc.isConnected()){
try{
throw new SocketTimeoutException("连接超时");
}catch (SocketTimeoutException e){
e.printStackTrace();
handler.sendEmptyMessage(-6); // 连接超时
}
}
}
};

在stackoverflow上有人回答了Read timeout for an NIO SocketChannel?

  1. You are using a Selector, in which case you have a select timeout which you can play with, and if it goes off (select(timeout) returns zero) you close all the registered channels, or
  2. You are using blocking mode, in which case you might think you should be able to call Socket.setSoTimeout() on the underlying socket (SocketChannel.socket()), and trap the SocketTimeoutException that is thrown when the timeout expires during read(), but you can’t, because it isn’t supported for sockets originating as channels, or
  3. You are using non-blocking mode without a Selector, in which case you need to change to case (1).

netty

netty业界公认的高成熟nio产品,也是大多数微服务底层使用的通信框架,内部细节值得挖一挖处理方式,篇幅有限,另开篇深挖

先看在微服务产品中的使用

connectionTimeout

这种场景很简单,在使用netty时,对应的配置选项

1
2
Bootstrap b = new Bootstrap();
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)

socketTimeout

这种场景各个框架在处理里,手法不尽相同,各有特色

motan

Motan是一套高性能、易于使用的分布式远程服务调用(RPC)框架,新浪微博开源。

之前解读过motan源码,《motan客户端解析》,《motan服务端解析》

NettyChannel.request

  1. 获取此method的配置timeout
  2. 在write时,awaitUninterruptibly
  3. 超时就抛出timeoutException
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public Response request(Request request) throws TransportException {
int timeout = nettyClient.getUrl().getMethodParameter(request.getMethodName(), request.getParamtersDesc(),
URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
if (timeout <= 0) {
throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
NettyResponseFuture response = new NettyResponseFuture(request, timeout, this.nettyClient);
this.nettyClient.registerCallback(request.getRequestId(), response);

ChannelFuture writeFuture = this.channel.write(request);

boolean result = writeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);

if (result && writeFuture.isSuccess()) {
response.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
// 成功的调用
nettyClient.resetErrorCount();
} else {
// 失败的调用
nettyClient.incrErrorCount();
}
}
});
return response;
}

writeFuture.cancel();
response = this.nettyClient.removeCallback(request.getRequestId());

if (response != null) {
response.cancel();
}

// 失败的调用
nettyClient.incrErrorCount();

if (writeFuture.getCause() != null) {
throw new MotanServiceException("NettyChannel send request to server Error: url="
+ nettyClient.getUrl().getUri() + " local=" + localAddress + " "
+ MotanFrameworkUtil.toString(request), writeFuture.getCause());
} else {
throw new MotanServiceException("NettyChannel send request to server Timeout: url="
+ nettyClient.getUrl().getUri() + " local=" + localAddress + " "
+ MotanFrameworkUtil.toString(request));
}
}

注意到其中的NettyResponseFuture,看名字就明白是个Future模式,在《代码小析 - 异步回调》中有分析过

接收到服务端返回

1
2
3
4
5
6
7
8
9
10
11
12
13
NettyResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId());
if (responseFuture == null) {
LoggerUtil.warn(
"NettyClient has response from server, but resonseFuture not exist, requestId={}",
response.getRequestId());
return null;
}

if (response.getException() != null) {
responseFuture.onFailure(response);
} else {
responseFuture.onSuccess(response);
}

获取真实结果NettyResponseFuture.getValue()

通过long waitTime = timeout - (System.currentTimeMillis() - createTime);计算一下真正的wait时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public Object getValue() {
synchronized (lock) {
if (!isDoing()) {
return getValueOrThrowable();
}

if (timeout <= 0) {
try {
lock.wait();
} catch (Exception e) {
cancel(new MotanServiceException("NettyResponseFuture getValue InterruptedException : "
+ MotanFrameworkUtil.toString(request) + " cost="
+ (System.currentTimeMillis() - createTime), e));
}

// don't need to notifylisteners, because onSuccess or
// onFailure or cancel method already call notifylisteners
return getValueOrThrowable();
} else {
long waitTime = timeout - (System.currentTimeMillis() - createTime);

if (waitTime > 0) {
for (;;) {
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
}

if (!isDoing()) {
break;
} else {
waitTime = timeout - (System.currentTimeMillis() - createTime);
if (waitTime <= 0) {
break;
}
}
}
}

if (isDoing()) {
timeoutSoCancel();
}
}
return getValueOrThrowable();
}
}

totalTimeout

有了timeout基本已经满足需求,但这儿再提一个totalTimeout,为什么需要一个总超时时间

比如客户端希望服务端在60ms内返回,由于成功率要求必须加一次重试,但是设置超时时间30ms重试一次会很浪费(绝大部分重试很快,但预留了30ms则压缩了初次调用的时间)。设置40ms重试一次就可能出现整体耗时大于60ms。所以希望可以配置整体超时时间为60ms,单次40ms加重试一次,这样既充分利用看重试机会也不会导致整体超过60ms

一次服务调用的正常请求的最长时间为:timeout * failovertimes + success_invocation_time

比如某个服务的timeout为100ms,failovertimes为1,正常调用的平均耗时为10ms,那么它的上游在重试情况下的耗时就是:乐观估计100 1+10=110ms;悲观估计100 1+100=200ms

为了保证总体超时时间,只能把单次超时时间压缩,使得某些情况下可能不需求重试的场景也进行了重试

对比一下,设置totalTimeout与不设置的情况:

  1. 某服务通常能在20ms返回,但是因为某些意外(比如gc),连续两次都要40ms
  • 只能设置单次timeout的情况下,timeout=30ms,failovertimes=1
    因此对于client来说,它看到的调用耗时就是:30ms(超时) + 30ms(超时) = 60ms
  • 分开设置首次超时和总体超时的情况下,timeout=40ms,total_timeout=60ms,failovertimes=1
    因此对于client来说,它看到的调用耗时就是:40ms(超时)+ (60ms-40ms)(超时) = 60ms
  1. 某服务通常能在20ms返回,但是因为某些意外(比如gc),这次需要35ms才能返回。
  • 只能设置单次timeout的情况下,timeout=30ms,failovertimes=1。
    因此对于client来说,它看到的调用耗时就是:30ms(超时) + 20ms = 50ms
  • 分开设置首次超时和总体超时的情况下,timeout=40ms,total_timeout=60ms,failovertimes=1。
    因此对于client来说,它看到的调用耗时就是:35ms(正常返回) = 35ms

重试

因某个服务实例短暂状态不佳而造成的超时,使用重试处理可以让请求转向其他服务实例的做法可以很好改善非集中式问题的成功率。

但如果超时重试只做简单的重试策略:有超时便重试,这样可能会导致服务端的崩溃。

例如:当前基础组件(如db)压力过大而造成超时,如果一律重试的话,会导致服务端集群实际接受请求量翻倍,这会使得基础组件压力无减反增,可能会导致其最终崩溃

实现

  • 思路简单,配置重试次数,出现非业务异常就重试,达到次数上限或者中途成功结束
  • 重试限流,重试造成雪崩的可能性,所以重试需要控制流量

motan

之前解读过motan源码,《motan客户端解析》,motan的failover就是这么处理的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 先使用method的配置
int tryCount =
refUrl.getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.retries.getName(),
URLParamType.retries.getIntValue());
// 如果有问题,则设置为不重试
if (tryCount < 0) {
tryCount = 0;
}

for (int i = 0; i <= tryCount; i++) {
Referer<T> refer = referers.get(i % referers.size());
try {
request.setRetries(i);
return refer.call(request);
} catch (RuntimeException e) {
// 对于业务异常,直接抛出
if (ExceptionUtil.isBizException(e)) {
throw e;
} else if (i >= tryCount) {
throw e;
}
LoggerUtil.warn(String.format("FailoverHaStrategy Call false for request:%s error=%s", request, e.getMessage()));
}
}

超时重试

但像我司框架就没有这样处理,只关注超时重试,因为超时重试主要是解决因偶尔短暂状态不佳而对成功率造成的影响,所以把重点放在处理短暂处于超时状态超时请求,对于长时间处于较大量的超时状态时,将选择不进行重试fail fast

限流

主要是防止重试次数过多,引起系统雪崩,有必要进行一下限流

《微服务-熔断机制》中提到过限流算法,细节可参考《计数器算法》

参考资料

Read timeout for an NIO SocketChannel?

朱兴生 wechat
最新文章尽在微信公众号『码农戏码』