代码小析 - 异步回调

天下皆知美之为美,斯恶已;此专栏本想取名代码之美,但有傍名之嫌,也给别误解,所以就叫代码小析吧,看到一段好代码,思路清奇,奇巧淫技,拿出来鉴赏一番

之前是计划one week one alogrithm,结果算法是个短板,不仅要理解,还得再写出代码,特别烧脑,所以中间穿插一下,换换脑子

之前有类似一篇《仅且仅创建一次对象》

最近看到一个段子:

老板有毛病吧,写完排序就叫我走人,我还嫌你这9K工资低了呢

感觉能想到这思路的也算清奇,哈哈!

回调

if you call me, i will call back

回调分类:同步回调,异步回调

场景

建立TCP连接是很耗时的,所以在创建Socket Channel时,可以通过异步回调方式解决

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* 异步取得channel
* @param index
* @param callback
*/
public void asynGetChannel(int index,final Callback callback) {
// 1. 随机获取一条channel
final int pos = ThreadLocalRandom.current().nextInt(MAX_CONNECTIONS);
Channel target = channels[pos];

// 2. 如果获取到了连接,直接返回
if (target != null && target.isActive()) {
logger.info("direct success "+index);
callback.onSuccess(target);
return;
}

synchronized (locks[pos]) {
target = channels[pos];
// 2. 如果获取到了连接,直接返回
if (target != null && target.isActive()) {
callback.onSuccess(target);
return;
}

// 3.如果连接正在创建中,则加入queue
if (target instanceof EmptyChannel) {
boolean result = jobs.offer(callback);
if (result) {
return;
} else {
throw new RuntimeException("Can't connet to target server and the waiting queue is full");
}
}

// 4. 连接尚未创建
channels[pos] = new EmptyChannel();

Connector.connect(host, port, new Callback() {
@Override
public void onSuccess(Channel channel) {
logger.info(index + " ------------connect success---------"+pos + " channel:" +channels[pos].getClass().getName());
List<Callback> tmpJobs;//建立一个tempJobs,快速释放锁
synchronized (locks[pos]) {
// 设置channels,拷贝jobs队列,释放锁
channels[pos] = channel;
tmpJobs = drainJobs();
}
for(Callback pendingCallback : tmpJobs) {
try {
if(pendingCallback != callback) {
pendingCallback.onSuccess(channel);
}
} catch (Exception e) {
logger.error("call connectionCallback fail", e);
}
}
}

@Override
public void onError(Throwable e) {
List<Callback> tmpJobs;//建立一个tempJobs,快速释放锁
synchronized (locks[pos]) {
// 设置channels,拷贝jobs队列,释放锁
channels[pos] = null;
tmpJobs = drainJobs();
}
for(Callback pendingCallback : tmpJobs) {
try {
if(pendingCallback != callback) {
pendingCallback.onError(e);
}
} catch (Exception x) {
logger.error("call connectionCallback fail", x);
}
}
}
});
}
}

完整的代码:https://github.com/zhuxingsheng/javastudy

亮点

思路很简单,亮点就在于job队列,连接在没有建立成功时,会先建立一个EmptyChannel,有些类似lazy load中的影子对象放到队列中,不造成阻塞,当channel建立完成后,回调

VS Future模式

异步回调的套路与Future模式特别类似

1
2
3
4
Future future = doTask1();
doTask2();
doTask3();
Result result = future.get();

Future 模式中,一个任务的启动和获取结果分成了两部分,启动执行是异步的,调用后立马返回,调用者可以继续做其他的任务,而等到其他任务做完,再获取Future的结果,此时调用 get 时是同步的,也就是说如果 doTask1 如果还没有做完,等它做完。

看出最大区别,异步回调不需要返回值,准确说调用者不用太关心返回值,甚至不需要关心真正执行情况,而future模式就不一样了,调用者是一定要拿到返回值的

参考

同步调用,异步回调和 Future 模式

公众号:码农戏码
欢迎关注微信公众号『码农戏码』