码农戏码

新生代农民工的自我修养


  • 首页

  • 归档

  • 标签

  • 关于

  • 在线工具

  • 搜索

微服务-熔断机制

发表于 2018-04-16
字数统计: 1.5k 字数 | 阅读时长 ≈ 5 分钟

背景

由于微服务间通过RPC来进行数据交换,所以我们可以做一个假设:在IO型服务中,假设服务A依赖服务B和服务C,而B服务和C服务有可能继续依赖其他的服务,继续下去会使得调用链路过长,技术上称1->N扇出

1->N扇出

问题

如果在A的链路上某个或几个被调用的子服务不可用或延迟较高,则会导致调用A服务的请求被堵住,堵住的请求会消耗占用掉系统的线程、io等资源,当该类请求越来越多,占用的计算机资源越来越多的时候,会导致系统瓶颈出现,造成其他的请求同样不可用,最终导致业务系统崩溃

  1. 服务器失败影响服务质量
  2. 超负荷导致整个服务失败
  3. 服务失败造成的雪崩效应

微服务服务依赖调用

超负荷导致整个服务失败

服务失败造成的雪崩效应

熔断

熔断模式:这种模式主要是参考电路熔断,如果一条线路电压过高,保险丝会熔断,防止火灾。放到我们的系统中,如果某个目标服务调用慢或者有大量超时,此时,熔断该服务的调用,对于后续调用请求,不在继续调用目标服务,直接返回,快速释放资源。如果目标服务情况好转则恢复调用。

定义里面有几个量化的地方

  1. 目标服务调用慢或者超时:开启熔断的阀值量化

可以通过两个维度:时间与请求数

时间
多长时间内的超时请求达到多少,触发熔断

请求数
从服务启动,超时请求数达到多少,触发

这两个维度都需要记录超时请求数和统计总请求数

  1. 情况好转,恢复调用

如何量化情况好转:多长时间之后超时请求数低于多少关闭熔断

熔断状态

熔断状态

三种状态的切换

开 – 半开 – 关

开:使用快速失败返回,调用链结束

半开:当熔断开启一段时间后,尝试阶段

关:调用正常

实现机制

可以使用一段伪代码表示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//正常request
if( request is open) {
//fastfail
} else if( request is halfopen) {
if ( request success count > recoverySampleVolume) {
//state --> close
}
}


//失败request
if( request is failcount > requestVolumeThreshold && errorPercentage > threshold) {
//close --> open
}

请求熔断开启时,直接快速失败

是halfopen状态,如果成功处理次数是否大于恢复配置,就关闭熔断

如果失败次数超过阀值,开启熔断

而对于open–>halfopen的转换,可以通过定时器主动触发

具体实现

现在有很多开源的

failsafe:https://github.com/jhalterman/failsafe

Hystrix

个案实现

在没有熔断时,请求链路:

client –> request –> balance – > handler

一个请求过来,通过负载均衡找到具体的server,再执行

加入熔断后:

client –> request –> circuitBreakerfilter –> balance – > handler

CircuitBreakerFilter过滤掉被熔断的server,在负载均衡时,不再被选中

  1. getAllServers() 获取所有服务器列表
  2. 根据requestService,requestMethod获取熔断的servers
    • 从allserverList中剔除这些server

熔断服务列表怎么维护呢?

正常状态 –> 熔断状态
1
2
3
4
5
6
7
1. 收到失败请求(e.g.超时,系统异常)
2. 判断此service是否配置了熔断策略 map<serviceName,circuitBreakerpolicy>
- 根据serviceName,method,serverInfo获取CircuitBreakerCounter
- counter对失败次数+1
- 此server是否在half open状态 HalfOpenServersMap<serverName+method,serverList>
- 在:如果失败次数超过RecoverySampleVolume,openserversmap<servername+method,serverlist>进行put操作、并从HalfOpenServersMap中remove
- 不在:请求数大于等于10笔(requestVolumeThreshold),且错误率达到60%(errorPercentage),openserversmap<servername+method,serverlist>进行put操作
熔断状态 –> 正常状态
1
2
3
4
5
1. 收到请求
2. 判断此service是否配置了熔断策略 map<serviceName,circuitBreakerpolicy>
- 根据serviceName,method,serverInfo获取CircuitBreakerCounter
- counter调用次数+1
- 若half-open 状态下的服务instance被调用次数超过取样的sample数,从HalfOpenServersMap中remove
疑问
  1. 错误率怎么计算?
  2. counter的实现
  3. 上面是close与open的转换,怎么转换到halfopen?

错误率= 错误次数/请求次数

halfopen状态

在上面的提到,被熔断的服务,如果情况好转就会关闭熔断!“情况好转”:什么时候去判断情况好转,怎么判断情况好转两方面

  1. 在加入到openserversmap时,同时开启延迟时间窗口后的定时任务
    • 从openserversmap中移除,加入到halfOpenServersMap

counter实现

  1. 简单点:AtomicLong,如当是halfopen时,使用这种简单的计数器叠加
  2. 滑动时间窗口实现

VS 降级

提到熔断,不得不起一下降级。两者的区别

有时语言真是乏力,不容易表达清楚,罗列一下

熔断是框架提供,不管业务什么样,防止系统雪崩,都需要提供一下基本功能;而降级与业务有关,自动或手动。比如支付,有很多种支付方式,储蓄卡,信用卡,支付宝,微信。若发现某一支付通道不稳定,或压力过大,手动先关闭,这就是一种降级

由此可看出:

  1. 触发原因不太一样,服务熔断一般是某个服务(下游服务)故障引起,而服务降级一般是从整体负荷考虑;
  2. 管理目标的层次不太一样,熔断其实是一个框架级的处理,每个微服务都需要(无层级之分),而降级一般需要对业务有层级之分(比如降级一般是从最外围服务开始)
  3. 实现方式不一样

参考

微服务熔断与隔离

CircuitBreaker

zookeeper-paxos

发表于 2018-01-29 | 分类于 读书
字数统计: 7.4k 字数 | 阅读时长 ≈ 26 分钟

在《常识五配置中心》文章中,少了一节关于zookeeper内容,现在补全

此篇也作为《从Paxos到zookeeper分布式一致性原理与实践》的读书笔记

image

这本书很早就出版了,现在才知道,惭愧。好书总是发现的晚,Better late than never!

IT界日异月新,如果你还没有使用过ZK,那也可以跳过了,虽然现在大多数互联网架构都使用,但它也是个古老物件了。随着CoreOS和Kubernetes等项目在开源社区日益火热,etcd已是跃然而上,我司新一代配置中心架构也开始使用etcd代替zk

但功不唐捐,还是要努力抓住它的尾巴,回味一下错失的年华

问题提出

分布式系统对于数据的复制需求一般都来自于以下两个原因

  1. 为了增加系统的可用性,以防止单点故障引起的系统不可用。
  2. 提高系统的整体性能,通过负载均衡技术,能够让分布在不同地方的数据副本都能够为用户提供服务。

数据复制在可用性和性能方面给分布式系统带来的巨大好处是不言而喻的,然而数据复制所带来的一致性挑战,也是每一个系统研发人员不得不面对的。

一致性

  • 强一致性
  • 弱一致性
    1. 会话一致性
    2. 用户一致性
  • 最终一致性

分布式架构

通过消息传递进行通信和协调的系统

  • 分布性
  • 对等性
  • 并发性
  • 缺乏全局时钟
  • 故障总是会发生

问题

通信异常

网络是不可靠的

网络分区

俗称“脑裂”

三态

成功,失败,超时

节点故障

每个节点随时都有可能发生故障

ACID && CAP && BASE

image

ACID

这个集中式架构中,数据库就能保证

ACID是Atomic(原子性)、Consistency(一致性)、Isolation(隔离性)和Durability(持久性)的英文缩写。

原子性:指整个数据库事务是不可分割的工作单位。只有使据库中所有的操作执行成功,才算整个事务成功;事务中任何一个SQL语句执行失败,那么已经执行成功的SQL语句也必须撤销,数据库状态应该退回到执行事务前的状态。

一致性:指数据库事务不能破坏关系数据的完整性以及业务逻辑上的一致性。例如对银行转帐事务,不管事务成功还是失败,应该保证事务结束后ACCOUNTS表中Tom和Jack的存款总额为2000元。

隔离性:指的是在并发环境中,当不同的事务同时操纵相同的数据时,每个事务都有各自的完整数据空间。由并发事务所做的修改必须与任何其他并发事务所做的修改隔离。事务查看数据更新时,数据所处的状态要么是另一事务修改它之前的状态,要么是另一事务修改它之后的状态,事务不会查看到中间状态的数据。

持久性:指的是只要事务成功结束,它对数据库所做的更新就必须永久保存下来。即使发生系统崩溃,重新启动数据库系统后,数据库还能恢复到事务成功结束时的状态。

CAP

CAP原则又称CAP定理,指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼

  • 一致性(C):在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
  • 可用性(A):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
  • 分区容错性(P):以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。

设想一下,当一个分布式系统发生了部分隔离:

image

节点被分隔到了两个区域:写入区域(1)的数据无法复制到区域(2),而访问区域(2)的请求也不能被转发到区域(1)。

这时候,如果让左边的写入成功——优先保证可用性,则访问区域(2)的请求读不到最新一致的数据,违反了一致性。

反之,如果让写入失败(阻塞),或者彻底阻止外部请求访问区域(2)——则保证了数据一致性,但是损失了可用性。

因此,要同时保证一致性和可用性,区域(1)和区域(2)必须能够互相通讯。

BASE, 最终一致性

这个理论由 Basically Available, Soft state, Eventual consistency 组成。核心的概念是 Eventual consistency ——最终一致性。它局部的放弃了 CAP 理论中的“完全”一致性,提供了更好的可用性和分区容忍度。

Basically Available

基本可用, 或者说部分可用。由于分布式系统的节点故障是常见的,业务必须接受这种不可用,并且做出选择:是访问另一个节点忍受数据的临时不一致,还是等待节点恢复并忍受业务上的部分不可用。

Soft state

把所有节点的数据 (数据 = 状态) 都看作是缓存(Cache)。适当的调整业务,使业务可以忍受数据的临时不一致,并保证这种不一致是无害的,可以被最终用户理解。

Eventual consistency

放弃在任何时刻、从任何节点都能读到完全一致的数据。允许数据的临时不一致,并通过异步复制、重试和合并消除数据的临时不一致。

注意 在分布式系统中,写入和读取可能发生在不同的节点上。最终一致带来的问题是,业务在写入后立即读取,很可能读不到刚刚写入的数据

在实际工程实践中,最终一致性存在以下五类主要变种。

  1. 因果一致性(Causal consistency)
    因果一致性是指,如果进程A在更新完某个数据项后通知了进程B,那么进程B之后对该数据项的访问都应该能够获取到进程A更新后的最新值,并且如果进程B要对该数据项进行更新操作的话,务必基于进程A更新后的最新值,即不能发生丢失更新情况。与此同时,与进程A无因果关系的进程C的数据访问则没有这样的限制。
  2. 读己之所写(Read your writes)
    读己之所写是指,进程A更新一个数据项之后,它自己总是能够访问到更新过的最新值,而不会看到旧值。也就是说,对于单个数据获取者来说,其读取到的数据,一定不会比自己上次写入的值旧。因此,读己之所写也可以看作是一种特殊的因果一致性。
  3. 会话一致性(Session consistency)
    会话一致性将对系统数据的访问过程框定在了一个会话当中:系统能保证在同一个有效的会话中实现“读己之所写”的一致性,也就是说,执行更能操作之后,客户端能够在同一个会话中始终读取到该数据项的最新值。
  4. 单调读一致性(Monotonic read consistency)
    单调读一致性是指如果一个进程从系统中读取出一个数据项的某个值后,那么系统对于该进程后续的任何数据访问都不应该返回更旧的值。
  5. 单调写一致性(Monotonic write consistency)
    单调写一致性是指,一个系统需要能够保证来自同一个进程的写操作被顺序地执行。

一致性协议

2PC&&3PC

2PC/3PC全称:Two/Three Phase Commit,中文名叫叫两阶段/三阶段提交;为了使基于分布式系统架构下的所有节点在进行事务处理的过程中能够ACID特性而设计的一种算法,需要引入一个作为协调者的组件来统一掌控所有节点(称作参与者)的操作结果并最终指示这些节点是否要把操作结果进行真正的提交

2PC

2pc
第一阶段:提交事务阶段(投票阶段)

  1. 事务询问:协调者会问所有的参与者结点,是否可以执行提交操作
  2. 执行事务:各个参与者执行事务操作 如:资源上锁,将Undo和Redo信息记入事务日志中
  3. 参与者向协调者反馈事务询问的响应:如果参与者成功执行了事务操作,反馈给协调者Yes响应,否则No响应

第二阶段:执行事务提交(执行阶段)

假如协调者从所有的参与者获得的反馈都是Yes响应,那么就会执行事务提交

  1. 发送提交请求:协调者向参与者发送Commit请求
  2. 事务提交:参与者接受到Commit请求后,会正式执行事务提交操作,并在完成提交之后释放事务资源
  3. 反馈事务提交结果:参与者在完成事务提交之后,向协调者发送Ack消息
  4. 完成事务:协调者接受到所有参与者反馈的Ack消息后,完成事务

假如任何一个参与者向协调者反馈了No响应,或者在等待超时之后,协调者尚无接收到所有参与者的反馈信息,那么就会中断事务

  1. 发送回滚请求:协调者向参与者发送Rollback请求
  2. 事务回滚:参与者利用Undo信息来执行事务回滚,并释放事务资源
  3. 反馈事务回滚结果:参与者在完成事务回滚之后,向协调者发送Ack消息
  4. 中断事务:协调者接收到所有参与者反馈的Ack消息之后,中断事务

网上看来的西方教堂结婚一个桥段很好的描述了2PC协议:
1.牧师分别问新郎和新娘:你是否愿意……不管生老病死……(投票阶段)
2.当新郎和新娘都回答愿意后(锁定一生的资源),牧师就会说:我宣布你们……(执行阶段)

优缺点

  • 二阶段提交协议的优点:原理简单,实现方便。
  • 二阶段提交协议的缺点:同步阻塞、单点问题、脑裂、太过保守
同步阻塞

二阶段提交协议存在的最明显也是最大的一个问题就是同步阻塞,这会极大地限制分布式系统的性能。在二阶段提交的执行过程中,所有参与该事务操作的逻辑都处于阻塞状态,也就是说,各个参与者在等待其他参与者响应的过程中,将无法进行其他任何操作。

单点问题

协调者的角色在整个二阶段提交协议中起到了非常重要的作用。一旦协调者出现问题,那么整个二阶段提交流程将无法运转,更为严重的是,如果协调者是在阶段二中出现问题的话,那么其他参与者将会一直处于锁定事务资源的状态中,而无法继续完成事务操作。

数据不一致

在二阶段提交协议的阶段二,即执行事务提交的时候,当协调者向所有的参与者发送Commit请求之后,发生了局部网络异常或者是协调者在尚未发送完Commit请求之前自身发生了崩溃,导致最终只有部分参与者收到了Commit请求。于是,这部分收到了Commit请求的参与者就会进行事务的提交,而其他没有收到Commit请求的参与者则无法进行事务提交,于是整个分布式系统便出现了数据不一致性现象。

太过保守

如果在协调者指示参与者进行事务提交询问的过程中,参与者出现故障而导致协调者始终无法获取到所有参与者的响应信息的话,这时协调者只能依靠其自身的超时机制来判断是否需要中断事务,这样的策略显得比较保守。换句话说,二阶段提交协议没有设计较为完善的容错机制,任意一个节点的失败都会导致整个事务的失败。

在异步环境(asynchronous)并且没有节点宕机(fail-stop)的模型下,2PC可以满足全认同、值合法、可结束,是解决一致性问题的一种协议。但如果再加上节点宕机(fail-recover)的考虑,2PC是否还能解决一致性问题呢?

coordinator如果在发起提议后宕机,那么participant将进入阻塞(block)状态、一直等待coordinator回应以完成该次决议。这时需要另一角色把系统从不可结束的状态中带出来,我们把新增的这一角色叫协调者备份(coordinator watchdog)。coordinator宕机一定时间后,watchdog接替原coordinator工作,通过问询(query) 各participant的状态,决定阶段2是提交还是中止。这也要求 coordinator/participant 记录(logging)历史状态,以备coordinator宕机后watchdog对participant查询、coordinator宕机恢复后重新找回状态。

从coordinator接收到一次事务请求、发起提议到事务完成,经过2PC协议后增加了2次RTT(propose+commit),带来的时延(latency)增加相对较少。

3PC

3PC是2PC的改进版本,将2PC的第一阶段:提交事务阶段一分为二,形成了CanCommit、PreCommit和doCommit三个阶段组成的事务处理协议

在2PC中一个participant的状态只有它自己和coordinator知晓,假如coordinator提议后自身宕机,在watchdog启用前一个participant又宕机,其他participant就会进入既不能回滚、又不能强制commit的阻塞状态,直到participant宕机恢复。这引出两个疑问:

  1. 能不能去掉阻塞,使系统可以在commit/abort前回滚(rollback)到决议发起前的初始状态
  2. 当次决议中,participant间能不能相互知道对方的状态,又或者participant间根本不依赖对方的状态

具体看一张流程图

3pc

阶段一:CanCommit

  1. 事务询问。
    协调者向所有的参与者发送一个包含事务内容的canCommit请求,询问是否可以执行事务提交操作,并开始等待各参与者的响应。
  2. 各参与者向协调者反馈事务询问的响应。
    参与者在接收到来自协调者的canCommit请求后,正常情况下,如果其自身认为可以顺利执行事务,那么会反馈Yes响应,并进入预备状态,否则反馈No响应。

阶段二:PreCommit

在阶段二中,协调者会根据各参与者的反馈情况来决定是否可以进行事务的PreCommit操作,正常情况下,包含两种可能。

  • 执行事务预提交

假如协调者从所有的参与者获得的反馈都是Yes响应,那么就会执行事务预提交。

  1. 发送预提交请求。
    协调者向所有参与者节点发出preCommit的请求,并进入Prepared阶段。
  2. 事务预提交。
    参与者接收到preCommit请求后,会执行事务操作,并将Undo和Redo信息记录到事务日志中。
  3. 各参与者向协调者反馈事务执行的响应。
    如果参与者成功执行了事务操作,那么就会反馈给协调者Ack响应,同时等待最终的指令:提交(commit)或中止(abort)。
  • 中断事务
    假如任何一个参与者向协调者反馈了No响应,或者在等待超时之后,协调者尚无法接收到所有参与者的反馈响应,那么就会中断事务。
  1. 发送中断请求。
    协调者向所有参与者节点发出abort请求。
  2. 中断事务。
    无论是收到来自协调者的abort请求,或者是在等待协调者请求过程中出现超时,参与者都会中断事务。

阶段三:doCommit

该阶段将进行真正的事务提交,会存在以下两种可能的情况。

  • 执行提交
  1. 发送提交请求。
    进入这一阶段,假设协调者处于正常工作状态,并且它接收到了来自所有参与者的Ack响应,那么它将从“预提交”状态转换到“提交”状态,并向所有的参与者发送doCommit请求。
  2. 事务提交。
    参与者接收到doCommit请求后,会正式执行事务提交操作,并在完成提交之后释放在整个事务执行期间占用的事务资源。
  3. 反馈事务提交结果。
    参与者在完成事务提交之后,向协调者发送Ack消息。
  4. 完成事务。
    协调者接收到所有参与者反馈的Ack消息后,完成事务。
  • 中断事务
    进入这一阶段,假设协调者处于正常工作状态,并且有任意一个参与者向协调者反馈了No响应,或者在等待超时之后,协调者尚无法接收到所有参与者的反馈响应,那么就会中断事务。
  1. 发送中断请求。
    协调者向所有的参与者节点发送abort请求。
  2. 事务回滚。
    参与者接收到abort请求后,会利用其在阶段二中记录的Undo信息来执行事务回滚操作,并在完成回滚之后释放在整个事务执行期间占用的资源。
  3. 反馈事务回滚结果。
    参与者在完成事务回滚之后,向协调者发送Ack消息。
  4. 中断事务。
    协调者接收到所有参与者反馈的Ack消息后,中断事务。

需要注意的是,一旦进入阶段三,可能会存在以下两种故障。

  1. 协调者出现问题
  2. 协调者和参与者之间的网络出现故障。

无论出现哪种情况,最终都会导致参与者无法及时接收到来自协调者的doCommit或是abort请求,针对这样的异常情况,参与者都会在等待超时之后,继续进行事务提交。

优缺点

三阶段提交协议的优点:

相较于二阶段提交协议,三阶段提交协议最大的优点就是降低了参与者的阻塞范围,并且能够在出现单点故障后继续达成一致。

三阶段提交协议的缺点:

三阶段提交协议在去除阻塞的同时也引入了新的问题,那就是在参与者接收到preCommit消息后,如果网络出现分区,此时协调者所在的节点和参与者无法进行正常的网络通信,在这种情况下,该参与者依然会进行事务的提交,这必然出现数据的不一致性。

coordinator接收完participant的反馈(vote)之后,进入阶段2,给各个participant发送准备提交(prepare to commit)指令。participant接到准备提交指令后可以锁资源,但要求相关操作必须可回滚。coordinator接收完确认(ACK)后进入阶段3、进行commit/abort,3PC的阶段3与2PC的阶段2无异。协调者备份(coordinator watchdog)、状态记录(logging)同样应用在3PC。

participant如果在不同阶段宕机,我们来看看3PC如何应对:

  • 阶段1: coordinator或watchdog未收到宕机participant的vote,直接中止事务;宕机的participant恢复后,读取logging发现未发出赞成vote,自行中止该次事务
  • 阶段2: coordinator未收到宕机participant的precommit ACK,但因为之前已经收到了宕机participant的赞成反馈(不然也不会进入到阶段2),coordinator进行commit;watchdog可以通过问询其他participant获得这些信息,过程同理;宕机的participant恢复后发现收到precommit或已经发出赞成vote,则自行commit该次事务
  • 阶段3: 即便coordinator或watchdog未收到宕机participant的commit ACK,也结束该次事务;宕机的participant恢复后发现收到commit或者precommit,也将自行commit该次事务

因为有了准备提交(prepare to commit)阶段,3PC的事务处理延时也增加了1个RTT,变为3个RTT(propose+precommit+commit),但是它防止participant宕机后整个系统进入阻塞态,增强了系统的可用性,对一些现实业务场景是非常值得的。

paxos

2PC:同步阻塞、单点问题、脑裂、太过保守

3PC:主要解决的单点故障问题,并减少阻塞,但依然存在数据不一致以及太过保守问题

2PC协议用于保证属于多个数据分片上的操作的原子性。这些数据分片可能分布在不同的服务器上,2PC协议保证多台服务器上的操作要么全部成功,要么全部失败。

Paxos协议用于保证同一个数据分片的多个副本之间的数据一致性。

Paxos算法要解决的问题就是如何在可能发生几起宕机或网络异常的分布式系统中,快速且正确地在集群内部对某个数据的值达成一致,并且保证不论发生以上任何异常,都不会破坏整个系统的一致性

复制策略

很多资料介绍paxos时,很学术,上来就是提案者,接受者~ 我也是云里雾里,只能不明觉历。

罗列一些问题:

一致性是什么?以前怎么处理一致性问题?

没有paxos时,以前的解决方案有哪些问题?

paxos怎么演变而来?

paxos怎么解决问题的?

理论背景的缺失,让人难以理解!

看到了可靠分布式系统基础 Paxos 的直观解释,感觉有点明白了。引用一下!

对于一致性,现在一些方案大都是走复制模式,如主从及进化的主从

主从异步复制

如Mysql的binlog复制

  1. 主接到写请求
  2. 主写入本磁盘
  3. 主应答‘OK’
  4. 主复制数据到从库

如果磁盘在复制前损坏: 数据丢失

image

主从同步复制

  1. 主接到写请求
  2. 主复制日志到从库
  3. 从库这时可能阻塞
  4. 客户端一直等待应答OK,直到所有从库返回

一个失联节点造成整个系统不可用
image

主从半同步复制

  1. 主接到写请求
  2. 主复制日志到从库
  3. 从库可能阻塞
  4. 如果1<=x<=n个从库返回OK,刚返回客户端OK

高可靠、高可用、可能任何从库都不完整

多数派写

  1. 客户端写入W >=N/2+1个节点,不需要主
  2. 多数派读:W+R>N;R>=N/2+1
  3. 容忍最多(N-1)/2个节点损坏
  4. 最后1次覆盖先前写入
  5. 所有写入操作需要有1个全局顺序:时间戳

一致性:最终一致性
事务性:非原子更新、脏读、更新丢失问题

多数派读写的不足

一个假想存储服务

  1. 一个有3个存储节点的存储服务集群
  2. 使用多数派读写策略
  3. “i”的每次更新对应有多个版本i1,i2,i3…
  4. 这个存储系统支持3个命令 get读最新的i,set 设置下个版本i的值为n,inc 对i加n

命令实现:

“set” → 直接对应多数派写.

“inc” → (最简单的事务型操作):

  1. 通过多数派读,读取最新的 “i”: i1
  2. Let i2 = i1 + n
  3. set i2

image

并发问题

image

我们期待最终X可以读到i3=5, 这需要Y能知道X已经写入了i2。如何实现这个机制?

在X和Y的2次“inc”操作后,为了得到正确的i3:整个系统里对i的某个版本(i2),只能有1次成功写入.

推广为:在存储系统中,一个值(1个变量的1个版本)在被认为确定(客户端接到OK)之后,就不允许被修改().

如何定义“被确定的”?

如何避免修改“被确定的”值

如何确定一个值

方案:每次写入一个值前,先运行一次多数派读,来确认是否这个值(可能)已经被写过了

image

但是,X和Y可能同时以为还没有值被写入过,然后同时开始写

image

方案改进:让存储节点记住谁最后1次做过“写前读取”,并拒绝之前其他的“写前读取”的写入操作

image

paxos

paxos就是以上的解决方案

将所有节点都写入同一个值,且被写入后不再更改。

两个操作

  1. Proposal Value:提议的值;
  2. Proposal Number:提议编号,可理解为提议版本号,要求不能冲突;

三个角色

  1. Proposer:提议发起者。Proposer 可以有多个,Proposer 提出议案(value)。所谓 value,可以是任何操作,比如“设置某个变量的值为value”。不同的 Proposer 可以提出不同的 value,例如某个Proposer 提议“将变量 X 设置为 1”,另一个 Proposer 提议“将变量 X 设置为 2”,但对同一轮 Paxos过程,最多只有一个 value 被批准。
  2. Acceptor:提议接受者;Acceptor 有 N 个,Proposer 提出的 value 必须获得超过半数(N/2+1)的 Acceptor批准后才能通过。Acceptor 之间完全对等独立。
  3. Learner:提议学习者。上面提到只要超过半数accpetor通过即可获得通过,那么learner角色的目的就是把通过的确定性取值同步给其他未确定的Acceptor。

协议过程

proposer将发起提案(value)给所有accpetor,超过半数accpetor获得批准后,proposer将提案写入accpetor内,最终所有accpetor获得一致性的确定性取值,且后续不允许再修改。

协议分为两大阶段,每个阶段又分为A/B两小步骤:

  1. 准备阶段(占坑阶段)
    • 第一阶段A:Proposer选择一个提议编号n,向所有的Acceptor广播Prepare(n)请求。
    • 第一阶段B:Acceptor接收到Prepare(n)请求,若提议编号n比之前接收的Prepare请求都要大,则承诺将不会接收提议编号比n小(<=)的提议,并且带上之前Accept的提议中编号小于n(<)的最大的提议,否则不予理会。
  2. 接受阶段(提交阶段)
    • 第二阶段A:整个协议最为关键的点:Proposer得到了Acceptor响应
    • 如果未超过半数accpetor响应,直接转为提议失败;
    • 如果超过多数Acceptor的承诺,又分为不同情况:
    1. 如果所有Acceptor都未接收过值(都为null),那么向所有的Acceptor发起自己的值和提议编号n,记住,一定是所有Acceptor都没接受过值;
    2. 如果有部分Acceptor接收过值,那么从所有接受过的值中选择对应的提议编号最大的作为提议的值,提议编号仍然为n。但此时Proposer就不能提议自己的值,只能信任Acceptor通过的值,维护一但获得确定性取值就不能更改原则;
    • 第二阶段B:Acceptor接收到提议后,如果该提议版本号不等于自身保存记录的版本号(第一阶段记录的),不接受该请求,相等则写入本地。

整个paxos协议过程看似复杂难懂,但只要把握和理解这两点就基本理解了paxos的精髓:

  1. 理解第一阶段accpetor的处理流程:如果本地已经写入了,不再接受和同意后面的所有请求,并返回本地写入的值;如果本地未写入,则本地记录该请求的版本号,并不再接受其他版本号的请求,简单来说只信任最后一次提交的版本号的请求,使其他版本号写入失效;
  2. 理解第二阶段proposer的处理流程:未超过半数accpetor响应,提议失败;超过半数的accpetor值都为空才提交自身要写入的值,否则选择非空值里版本号最大的值提交,最大的区别在于是提交的值是自身的还是使用以前提交的。

简单讲,在prepare阶段,以编号大的为准;在accept阶段以值为准

参考资料

漫谈事务与分布式事务(3)- 分布式困境

分布式系统理论基础 - 一致性、2PC和3PC

可靠分布式系统基础 Paxos 的直观解释

如何浅显易懂地解说 Paxos 的算法

Paxos算法的理解

以两军问题为背景来演绎Basic Paxos

Basic Paxos算法

游戏灰度发布

发表于 2018-01-26
字数统计: 1.4k 字数 | 阅读时长 ≈ 4 分钟

背景

快速可以说是互联网的最大特点了,唯快不破,快速响应,快速发布,快速部署,快速上线

但上线,毕竟还是有风险的,怎么能又快速响应,又能降低风险范围呢

前人,现人,后人们都在寻找着银弹

部署方式就进化了有很多次,蓝绿部署、滚动部署、灰度发布、金丝雀发布。。。

这些都是为了应对互联网的快速响应需求

游戏的发布现在还是比较粗暴的,对开发,运维也比较简单。

制定一个版本计划,开发,与运营沟通,确定版本内容,到了时间,所有游戏区全部关闭入口,停止服务器,发布,部署,重启,开放入口,一气呵成,快哉!

等等,理想很丰满,现实很骨感

在版本发布最后一天,开发人员在凌晨1、 2点时,还在开发,修复bug,好不容易打包,回家睡觉

第二天运维在8点开始停机发布新版本;

duang,怎么游戏服起不来了,开发请起床,查问题

迷迷糊糊的开发在梦境中惊醒,终于搞定,打包,发版本,启动服务(有时可能要一上午查问题,通知运营方,延长维护时间)

duang,玩家反馈,新功能有问题…

此时,回滚?还是。。。;好汉不回头,哪来的回滚

紧急停机,再寻找问题,修复,上线…

…

整个游戏的链条上,似乎大家都已经习惯,开发习惯,玩家也习惯

习惯麻痹了一切,没有提出更好的策略,大家都这么玩啊,无所谓啦~

方案

细思极恐,我们应该,也需要做得更好

灰度发布/金丝雀发布

灰度发布是在原有版本可用的情况下,同时部署一个新版本应用作为“金丝雀”(金丝雀对瓦斯极敏感,矿井工人携带金丝雀,以便及时发发现危险),测试新版本的性能和表现,以保障整体系统稳定的情况下,尽早发现、调整问题。

灰度发布

灰度发布/金丝雀发布由以下几个步骤组成:

  1. 准备好部署各个阶段的工件,包括:构建工件,测试脚本,配置文件和部署清单文件。
  2. 从负载均衡列表中移除掉“金丝雀”服务器。
  3. 升级“金丝雀”应用(排掉原有流量并进行部署)。
  4. 对应用进行自动化测试。
  5. 将“金丝雀”服务器重新添加到负载均衡列表中(连通性和健康检查)。
  6. 如果“金丝雀”在线使用测试成功,升级剩余的其他服务器。(否则就回滚)

游戏架构

image

这个架构图比现实丰满不少,真实情况组件可能是单点的,数据层也就是单个mysql,一切都是那么脆弱。

流程图

流程图

玩家首先登陆游戏运营平台,鉴权完毕,选择区服,通过网关服务器获取到真实game-server信息,通过TCP,玩家与game-server建立起长连接。

通过这个流程,就知道玩家与game-server直接牵手,强依赖的,如果gameserver重启,tcp连接是一定会断的,虽然前端可能尝试重新连接,但对玩家是有感的,不可能对玩家透明。

改进

怎么才能对玩家无感,切换版本呢?

image

在之前的架构图中,稍作修改,在玩家与Gameserver之间增加一层ha-proxy,这样就有了灰度发布的基础

玩家不再直接与game-server直连,而是与ha-proxy

透明性

对玩家来说,发版本就是透明的,发版本时,不再需要停机,入口也不需要关闭,7*24玩耍

流量灵活切换

灰度百分比,可以灵活控制,这里面又涉及到路由规则,复杂了,可以先百分百切换

快速迭代

玩家无感,出现bug,可以快速修复,快速上线

快速回滚

一旦新版本有问题,可以马上切回老版本,版本之间无逢切换

难点

加了ha-proxy,多了更多的灵活性

ha-proxy的难点,高可用,高可靠,高性能

高可用

最重要的一点,不能单点;

如果ha-proxy挂了,怎么办?就算game-server正常运行,也不能再提供服务,自己坑了自己

所以ha-proxy不能单点,哪是集群,还是主从?

每台物理机上都部署,还是集中几台部署?

高可靠

在新旧版本同时在线时,流量是否平滑过渡? 玩家操作是否保持完整性?

一个玩家操作横跨新旧版本时,数据一致性如何保障?

高性能

游戏服都是尽量压榨单台服务的能力,现在多了一层通讯,IO会不会影响性能?

结论

对于以上方案,不论是哪一种实现方式,仁者见仁,条条大路通罗马。

也可能你觉得这种想法本身就是个多余。

能卖1块钱的豆腐,为什么要卖5毛?

常识四堆外内存

发表于 2017-12-31
字数统计: 7.5k 字数 | 阅读时长 ≈ 31 分钟

常识系列,作为一名互联网门外汉的科普系列

堆外内存除了在像netty开源框架中,在平常项目中使用的比较少,在现前的项目中,QPS要求高的系统中,堆外内存作为其中一级缓存是相当有成效的。所以来学习一下,文中主要涉及到这三分部内容

  1. 堆外内存是什么?与堆内内存的区别
  2. 怎么分配,与GC的影响
  3. 开源框架使用

这篇文章写到最后,发现还只是回答了开源框架OHC的Why not use ByteBuffer.allocateDirect()?

概念

堆内内存

现在流行的还是使用分代管理方式

image

之前写过相关文章GC及JVM参数

在jvm参数中只要使用-Xms,-Xmx等参数就可以设置堆的大小和最大值

堆外内存

和堆内内存相对应,堆外内存就是把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)

堆外内存有以下特点:

  • 对于大内存有良好的伸缩性
  • 对垃圾回收停顿的改善可以明显感觉到
  • 在进程间可以共享,减少虚拟机间的复制

堆外内存分配与回收

其实堆外内存一直在使用,却没有真正关注过。最常见的nio,Netty,里面大量使用了堆外内存

这儿会涉及到很多知识点,一步步来,抽丝剥茧

Buffer

这儿回顾下io知识,java提供了两种io处理方式,一种是io,另一种是nio

Java NIO和IO之间最大的区别是IO是面向流(Stream)的,NIO是面向块(buffer)的,所以,这意味着什么?

面向流意味着从流中一次可以读取一个或多个字节,拿到读取的这些做什么你说了算,这里没有任何缓存(这里指的是使用流没有任何缓存,接收或者发送的数据是缓存到操作系统中的,流就像一根水管从操作系统的缓存中读取数据)而且只能顺序从流中读取数据,如果需要跳过一些字节或者再读取已经读过的字节,你必须将从流中读取的数据先缓存起来。
面向块的处理方式有些不同,数据是先被 读/写到buffer中的,根据需要你可以控制读取什么位置的数据。这在处理的过程中给用户多了一些灵活性,然而,你需要额外做的工作是检查你需要的数据是否已经全部到了buffer中,你还需要保证当有更多的数据进入buffer中时,buffer中未处理的数据不会被覆盖

对于stream流来讲,一个一个字节处理效率太差了,所以还提供了带buffer的bufferedStream

对就到api,就是

1
2
3
4
5
read()
read(byte b[])

write()
write(byte b[])

nio是面向buffer的,所以有专门抽象了Buffer

buffer

zero copy

虽然通过调节buffer的大小,使用bufferedstream可以提升性能,但还不够

还可以通过Zero-Copy大大提高了应用程序的性能,并且减少了kernel和user模式上下文的切换

这儿需要再深入底层机制,来看系统内核与应用程序的交互过程

linux科普

这儿再回顾一下linux相关知识点

linux的内核态和用户态

  • 内核态:控制计算机的硬件资源,并提供上层应用程序运行的环境。比如socket I/0操作或者文件的读写操作等
  • 用户态:上层应用程序的活动空间,应用程序的执行必须依托于内核提供的资源。
  • 系统调用:为了使上层应用能够访问到这些资源,内核为上层应用提供访问的接口。

关系

因此我们可以得知当我们通过JNI调用的native方法实际上就是从用户态切换到了内核态的一种方式。并且通过该系统调用使用操作系统所提供的功能。

Q:为什么需要用户进程(位于用户态中)要通过系统调用(Java中即使JNI)来调用内核态中的资源,或者说调用操作系统的服务了?
A:intel cpu提供Ring0-Ring3四种级别的运行模式,Ring0级别最高,Ring3最低。Linux使用了Ring3级别运行用户态,Ring0作为内核态。Ring3状态不能访问Ring0的地址空间,包括代码和数据。因此用户态是没有权限去操作内核态的资源的,它只能通过系统调用外完成用户态到内核态的切换,然后在完成相关操作后再有内核态切换回用户态。

鉴于linux系统的特性,IO之流程就如下图

IO之流程与buffer概览

copy过程

大部分web服务器都要处理大量的静态内容,而其中大部分都是从磁盘文件中读取数据然后写到socket中。这种操作对cpu的消耗是比较小的,但也是十分低效的:内核首先从磁盘文件读取数据,然后从内核空间将数据传到用户空间,应用程序又将数据从用户空间返回到内核空间然后传输给socket(如果好奇数据为何如此来回传输,请继续看下文)。实际上,应用程序就相当于是个低效的中间者,从磁盘拿数据放到socket。

read/write模式

代码抽象:

1
2
read(file, tmp_buf, len);
write(socket, tmp_buf, len);

首先调用read将静态内容,这里假设为文件A,读取到tmp_buf, 然后调用write将tmp_buf写入到socket中

read/write copy steps

1、当调用 read 系统调用时,通过 DMA(Direct Memory Access)将数据 copy 到内核模式

2、然后由 CPU 控制将内核模式数据 copy 到用户模式下的 buffer 中

3、read 调用完成后,write 调用首先将用户模式下 buffer 中的数据 copy 到内核模式下的 socket buffer 中

4、最后通过 DMA copy 将内核模式下的 socket buffer 中的数据 copy 到网卡设备中传送。

从上面的过程可以看出,数据白白从内核模式到用户模式走了一圈,浪费了两次 copy(第一次,从kernel模式拷贝到user模式;第二次从user模式再拷贝回kernel模式,即上面4次过程的第2和3步骤。),而这两次 copy 都是 CPU copy,即占用CPU资源

sendfile模式

sendfile copy steps

通过 sendfile 传送文件只需要一次系统调用,当调用 sendfile 时:

1、首先通过 DMA copy 将数据从磁盘读取到 kernel buffer 中

2、然后通过 CPU copy 将数据从 kernel buffer copy 到 sokcet buffer 中

3、最终通过 DMA copy 将 socket buffer 中数据 copy 到网卡 buffer 中发送

sendfile 与 read/write 方式相比,少了 一次模式切换一次 CPU copy。但是从上述过程中也可以发现从 kernel buffer 中将数据 copy 到socket buffer 是没必要的。

sendfile模式改进

Linux2.4 内核对 sendfile 做了改进,下图所示

sendfilev2 copy steps

改进后的处理过程如下:

1、DMA copy 将磁盘数据 copy 到 kernel buffer 中

2、向 socket buffer 中追加当前要发送的数据在 kernel buffer 中的位置和偏移量

3、DMA gather copy 根据 socket buffer 中的位置和偏移量直接将 kernel buffer 中的数据 copy 到网卡上。

经过上述过程,数据只经过了 2 次 copy 就从磁盘传送出去了。(事实上这个 Zero copy 是针对内核来讲的,数据在内核模式下是 Zero-copy 的)。

当前许多高性能 http server 都引入了 sendfile 机制,如 nginx,lighttpd 等。

java zero copy

Zero-Copy技术省去了将操作系统的read buffer拷贝到程序的buffer,以及从程序buffer拷贝到socket buffer的步骤,直接将read buffer拷贝到socket buffer. Java NIO中的FileChannal.transferTo()方法就是这样的实现

1
public void transferTo(long position,long count,WritableByteChannel target);

transferTo()方法将数据从一个channel传输到另一个可写的channel上,其内部实现依赖于操作系统对zero copy技术的支持。在unix操作系统和各种linux的发型版本中,这种功能最终是通过sendfile()系统调用实现。下边就是这个方法的定义:

1
2
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

可以通过调用transferTo()方法来替代上边的File.read()、Socket.send()

通过transferTo实现数据传输的路径:

transferTo

展示了内核态、用户态的切换情况:

transferTo-contextSwitch

使用transferTo()方式所经历的步骤:

1、transferTo调用会引起DMA将文件内容复制到读缓冲区(内核空间的缓冲区),然后数据从这个缓冲区复制到另一个与socket输出相关的内核缓冲区中。

2、第三次数据复制就是DMA把socket关联的缓冲区中的数据复制到协议引擎上发送到网络上。

这次改善,我们是通过将内核、用户态切换的次数从四次减少到两次,将数据的复制次数从四次减少到三次(只有一次用到cpu资源)。但这并没有达到我们零复制的目标。如果底层网络适配器支持收集操作的话,我们可以进一步减少内核对数据的复制次数。

在内核为2.4或者以上版本的linux系统上,socket缓冲区描述符将被用来满足这个需求。这个方式不仅减少了内核用户态间的切换,而且也省去了那次需要cpu参与的复制过程。
从用户角度来看依旧是调用transferTo()方法,但是其本质发生了变化:

1、调用transferTo方法后数据被DMA从文件复制到了内核的一个缓冲区中。

2、数据不再被复制到socket关联的缓冲区中了,仅仅是将一个描述符(包含了数据的位置和长度等信息)追加到socket关联的缓冲区中。DMA直接将内核中的缓冲区中的数据传输给协议引擎,消除了仅剩的一次需要cpu周期的数据复制。

transferTo

ByteBuffer创建

以上的知识点都是点缀,真正的主角上场了,看下java中是如何抽象上述理论的

ByteBuffer有两种分配buffer的方式:

分配HeapByteBuffer

1
ByteBuffer buffer = ByteBuffer.allocate(int capacity);

分配DirectByteBuffer

1
ByteBuffer buffer = ByteBuffer.allocateDirect(int capacity);

两者的区别,JDK里面说得很清楚

A byte buffer is either direct or non-direct. Given a direct byte buffer, the Java virtual machine will make a best effort to perform native I/O operations directly upon it. That is, it will attempt to avoid copying the buffer’s content to (or from) an intermediate buffer before (or after) each invocation of one of the underlying operating system’s native I/O operations.
A direct byte buffer may be created by invoking the allocateDirect factory method of this class. The buffers returned by this method typically have somewhat higher allocation and deallocation costs than non-direct buffers. The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an application might not be obvious. It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system’s native I/O operations. In general it is best to allocate direct buffers only when they yield a measureable gain in program performance.
A direct byte buffer may also be created by mapping a region of a file directly into memory. An implementation of the Java platform may optionally support the creation of direct byte buffers from native code via JNI. If an instance of one of these kinds of buffers refers to an inaccessible region of memory then an attempt to access that region will not change the buffer’s content and will cause an unspecified exception to be thrown either at the time of the access or at some later time.

从文中大致可以看到DirectByteBuffer的特点如下:

  • 对于native IO operation,JVM会有最佳的性能效果(它不需要一个中间缓冲区,而是可以直接使用,避免了将buffer中的数据再复制到中间缓冲区)。
  • 由于DirectByteBuffer分配与native memory中,不在heap区,不会受到heap区的gc影响。(一般在old gen的full gc才会收集。)
  • 分配和释放需要更多的成本。
    bytebuffer

从上可以总结DirectByteBuffer大致的应用场景如下(socket通信和大文件处理还是比较适用的):

  • 频繁的native IO操作。
  • 系统的要求处理响应速度快和稳定,即高吞吐和低延迟。
  • ByteBuffer的生命周期长且容量需求较大,会占用较多的内存空间。
    bytebuffer

看下代码,更直观一些

HeapByteBuffer

分配在堆上的,直接由Java虚拟机负责垃圾收集,你可以把它想象成一个字节数组的包装类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class HeapByteBuffer
extends ByteBuffer
{
HeapByteBuffer(int cap, int lim) { // package-private

super(-1, 0, lim, cap, new byte[cap], 0);
/*
hb = new byte[cap];
offset = 0;
*/
}
}

public abstract class ByteBuffer
extends Buffer
implements Comparable<ByteBuffer>
{

// These fields are declared here rather than in Heap-X-Buffer in order to
// reduce the number of virtual method invocations needed to access these
// values, which is especially costly when coding small buffers.
//
final byte[] hb; // Non-null only for heap buffers
final int offset;
boolean isReadOnly; // Valid only for heap buffers

// Creates a new buffer with the given mark, position, limit, capacity,
// backing array, and array offset
//
ByteBuffer(int mark, int pos, int lim, int cap, // package-private
byte[] hb, int offset)
{
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}

DirectByteBuffer

这个类就没有HeapByteBuffer简单了

DirectByteBuffer结构

DirectByteBuffer结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
DirectByteBuffer(int cap) {                   // package-private

super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
Bits.reserveMemory(size, cap);

long base = 0;
try {
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
// Round up to page boundary
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;

Bits.reserveMemory(size, cap) 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static void reserveMemory(long size, int cap) {
synchronized (Bits.class) {
if (!memoryLimitSet && VM.isBooted()) {
maxMemory = VM.maxDirectMemory();
memoryLimitSet = true;
}
// -XX:MaxDirectMemorySize limits the total capacity rather than the
// actual memory usage, which will differ when buffers are page
// aligned.
if (cap <= maxMemory - totalCapacity) {
reservedMemory += size;
totalCapacity += cap;
count++;
return;
}
}

System.gc();
try {
Thread.sleep(100);
} catch (InterruptedException x) {
// Restore interrupt status
Thread.currentThread().interrupt();
}
synchronized (Bits.class) {
if (totalCapacity + cap > maxMemory)
throw new OutOfMemoryError("Direct buffer memory");
reservedMemory += size;
totalCapacity += cap;
count++;
}
}

在DirectByteBuffer中,首先向Bits类申请额度,Bits类有一个全局的totalCapacity变量,记录着全部DirectByteBuffer的总大小,每次申请,都先看看是否超限,堆外内存的限额默认与堆内内存(由-Xmx 设定)相仿,可用 -XX:MaxDirectMemorySize 重新设定。

如果不指定,该参数的默认值为Xmx的值减去1个Survior区的值。

如设置启动参数-Xmx20M -Xmn10M -XX:SurvivorRatio=8,那么申请20M-1M=19M的DirectMemory

如果已经超限,会主动执行Sytem.gc(),期待能主动回收一点堆外内存。

System.gc()会触发一个full gc,当然前提是你没有显示的设置-XX:+DisableExplicitGC来禁用显式GC。并且你需要知道,调用System.gc()并不能够保证full gc马上就能被执行。

所以在使用netty这类框架时,一定要注意JVM优化,如果DisableExplicitGC那就可能会OOM了

然后休眠一百毫秒,看看totalCapacity降下来没有,如果内存还是不足,就抛出OOM异常。如果额度被批准,就调用大名鼎鼎的sun.misc.Unsafe去分配内存,返回内存基地址

1
2
3
// Used only by direct buffers
// NOTE: hoisted here for speed in JNI GetDirectBufferAddress
long address;

这样我们后面通过JNI对这个堆外内存操作时都是通过这个address来实现的了。

Unsafe的C++实现在此,标准的malloc。然后再调一次Unsafe把这段内存给清零。跑个题,Unsafe的名字是提醒大家这个类只给Sun自家用的

JDK7开始,DirectByteBuffer分配内存时默认已不做分页对齐,不会再每次分配并清零实际需要+分页大小(4k)的内存,这对性能应有较大提升,所以Oracle专门写在了Enhancements in Java I/O里。

最后,创建一个Cleaner,并把代表清理动作的Deallocator类绑定 – 降低Bits里的totalCapacity,并调用Unsafe调free去释放内存。Cleaner的触发机制后面再说。

DirectByteBuffer中

1
2
3
4
5
6
7
8
byte _get(int i) {                          // package-private
return unsafe.getByte(address + i);
}

void _put(int i, byte b) { // package-private

unsafe.putByte(address + i, b);
}

在前面我们说过,在linux中内核态的权限是最高的,那么在内核态的场景下,操作系统是可以访问任何一个内存区域的,所以操作系统是可以访问到Java堆的这个内存区域的。

Q:那为什么操作系统不直接访问Java堆内的内存区域了?

A:这是因为JNI方法访问的内存区域是一个已经确定了的内存区域地质,那么该内存地址指向的是Java堆内内存的话,那么如果在操作系统正在访问这个内存地址的时候,Java在这个时候进行了GC操作,而GC操作会涉及到数据的移动操作[GC经常会进行先标志在压缩的操作。即,将可回收的空间做标志,然后清空标志位置的内存,然后会进行一个压缩,压缩就会涉及到对象的移动,移动的目的是为了腾出一块更加完整、连续的内存空间,以容纳更大的新对象],数据的移动会使JNI调用的数据错乱。所以JNI调用的内存是不能进行GC操作的,JNI不能直接访问Java堆内的内存区域

Q:如上面所说,JNI不能直接访问Java堆内的内存区域,那该如何解决了?

A:①堆内内存与堆外内存之间数据拷贝的方式(并且在将堆内内存拷贝到堆外内存的过程JVM会保证不会进行GC操作):

比如我们要完成一个从文件中读数据到堆内内存的操作,即FileChannelImpl.read(HeapByteBuffer)。这里实际上File I/O会将数据读到堆外内存中,然后堆外内存再讲数据拷贝到堆内内存,这样我们就读到了文件中的内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public int read(ByteBuffer var1) throws IOException {
this.ensureOpen();
if(!this.readable) {
throw new NonReadableChannelException();
} else {
Object var2 = this.positionLock;
synchronized(this.positionLock) {
int var3 = 0;
int var4 = -1;

try {
this.begin();
var4 = this.threads.add();
if(!this.isOpen()) {
byte var12 = 0;
return var12;
} else {
do {
//关键点在这行
var3 = IOUtil.read(this.fd, var1, -1L, this.nd);
} while(var3 == -3 && this.isOpen());

int var5 = IOStatus.normalize(var3);
return var5;
}
} finally {
this.threads.remove(var4);
this.end(var3 > 0);

assert IOStatus.check(var3);

}
}
}
}

IOUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
if (var1.isReadOnly()) {
throw new IllegalArgumentException("Read-only buffer");
} else if (var1 instanceof DirectBuffer) {
return readIntoNativeBuffer(var0, var1, var2, var4);
} else {
// 分配临时的堆外内存
ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());

int var7;
try {
// File I/O 操作会将数据读入到堆外内存中
int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
var5.flip();
if (var6 > 0) {
// 将堆外内存的数据拷贝到堆内内存中
var1.put(var5);
}

var7 = var6;
} finally {
// 里面会调用DirectBuffer.cleaner().clean()来释放临时的堆外内存
Util.offerFirstTemporaryDirectBuffer(var5);
}

return var7;
}
}

而写操作则反之,我们会将堆内内存的数据线写到对堆外内存中,然后操作系统会将堆外内存的数据写入到文件中。

假设我们要从网络中读入一段数据,再把这段数据发送出去的话,采用Non-direct ByteBuffer的流程是这样的:

网络 –> 临时的Direct ByteBuffer –> 应用 Non-direct ByteBuffer –> 临时的Direct ByteBuffer –> 网络

② 直接使用堆外内存,如DirectByteBuffer:

这种方式是直接在堆外分配一个内存(即,native memory)来存储数据,
程序通过JNI直接将数据读/写到堆外内存中。因为数据直接写入到了堆外内存中,所以这种方式就不会再在JVM管控的堆内再分配内存来存储数据了,也就不存在堆内内存和堆外内存数据拷贝的操作了。这样在进行I/O操作时,只需要将这个堆外内存地址传给JNI的I/O的函数就好了。

采用Direct ByteBuffer的流程是这样的:

网络 –> 应用 Direct ByteBuffer –> 网络

可以看到,除开构造和析构临时Direct ByteBuffer的时间外,起码还能节约两次内存拷贝的时间。那么是否在任何情况下都采用Direct Buffer呢?

不是。对于大部分应用而言,两次内存拷贝的时间几乎可以忽略不计,而构造和析构DirectBuffer的时间却相对较长。在JVM的实现当中,某些方法会缓存一部分临时Direct ByteBuffer,意味着如果采用Direct ByteBuffer仅仅能节约掉两次内存拷贝的时间,
而无法节约构造和析构的时间。就用Sun的实现来说,write(ByteBuffer)和read(ByteBuffer)方法都会缓存临时Direct ByteBuffer,而write(ByteBuffer[])和read(ByteBuffer[])每次都生成新的临时Direct ByteBuffer。

根据这些区别,如下的建议:

  • 如果你做中小规模的应用(在这里,应用大小是按照使用ByteBuffer的次数和规模来做划分的),而且并不在乎这该死的细节问题,请选择Non-direct ByteBuffer
  • 如果采用Direct ByteBuffer后性能并没有出现你所期待的变化,请选择Non-direct ByteBuffer
  • 如果没有Direct ByteBuffer Pool,尽量不要使用Direct ByteBuffer
  • 除非你确定该ByteBuffer会长时间存在,并且和外界有频繁交互,可采用Direct ByteBuffer
  • 如果采用Non-direct ByteBuffer,那么采用非聚集(gather)的write/read(ByteBuffer)效果反而可能超出聚集的write/read(ByteBuffer[]),因为聚集的write/read的临时Direct ByteBuffer是非缓存的

基本上,采用Non-direct ByteBuffer总是对的!因为内存拷贝需要的开销对大部分应用而言都可以忽略不计。

ByteBuffer回收

HeapByteBuffer就不要说了,GC就帮忙处理了。这儿主要说下DirectByteBuffer

基于GC回收DirectByteBuffer

存在于堆内的DirectByteBuffer对象很小,只存着基地址和大小等几个属性,和一个Cleaner,但它代表着后面所分配的一大段内存,是所谓的冰山对象。

在内存中基本是这样子
off-heap-memory
其中first是Cleaner类的静态变量,Cleaner对象在初始化时会被添加到Clener链表中,和first形成引用关系,ReferenceQueue是用来保存需要回收的Cleaner对象。

如果该DirectByteBuffer对象在一次GC中被回收了
off-heap-memory
此时,只有Cleaner对象唯一保存了堆外内存的数据(开始地址、大小和容量),在下一次Full GC时,把该Cleaner对象放入到ReferenceQueue中,并触发clean方法。

快速回顾一下堆内的GC机制,当新生代满了,就会发生young gc;如果此时对象还没失效,就不会被回收;撑过几次young gc后,对象被迁移到老生代;当老生代也满了,就会发生full gc。

这里可以看到一种尴尬的情况,因为DirectByteBuffer本身的个头很小,只要熬过了young gc,即使已经失效了也能在老生代里舒服的呆着,不容易把老生代撑爆触发full gc,如果没有别的大块头进入老生代触发full gc,就一直在那耗着,占着一大片堆外内存不释放。

这时,就只能靠前面提到的申请额度超限时触发的system.gc()来救场了。但这道最后的保险其实也不很好,首先它会中断整个进程,然后它让当前线程睡了整整一百毫秒,而且如果gc没在一百毫秒内完成,它仍然会无情的抛出OOM异常。还有,万一,万一大家迷信某个调优指南设置了-DisableExplicitGC禁止了system.gc(),那就不好玩了。

所以,堆外内存还是自己主动点回收更好,比如Netty就是这么做的

主动回收DirectByteBuffer

对于Sun的JDK这其实很简单,只要从DirectByteBuffer里取出那个sun.misc.Cleaner,然后调用它的clean()就行。

1
2
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024 * 1024 * 500);
((DirectBuffer)byteBuffer).cleaner().clean();

前面说的,clean()执行时实际调用的是被绑定的Deallocator类,这个类可被重复执行,释放过了就不再释放。所以GC时再被动执行一次clean()也没所谓。

在Netty里,因为不确定跑在Sun的JDK里(比如安卓),所以多废了些功夫来确定Cleaner的存在

Cleaner类

1
2
3
4
5
6
public class Cleaner extends PhantomReference<Object> {
private static final ReferenceQueue<Object> dummyQueue = new ReferenceQueue();
private static Cleaner first = null;
private Cleaner next = null;
private Cleaner prev = null;
private final Runnable thunk;

PhantomReference 这个虚引用类很少见,它是java中最弱的引用类型

PhantomReference 类只能用于跟踪对被引用对象即将进行的收集。

同样,它还能用于执行 pre-mortem 清除操作。 PhantomReference 必须与 ReferenceQueue 类一起使用。需要 ReferenceQueue 是因为它能够充当通知机制。当垃圾收集器确定了某个对象是虚可及对象时, PhantomReference 对象就被放在它的 ReferenceQueue 上。将 PhantomReference 对象放在 ReferenceQueue 上也就是一个通知,表明 PhantomReference 对象引用的对象已经结束,可供收集了。这使您能够刚好在对象占用的内存被回收之前采取行动。

当GC时发现它除了PhantomReference外已不可达(持有它的DirectByteBuffer失效了),就会把它放进 Reference类pending list静态变量里。然后另有一条ReferenceHandler线程,名字叫 “Reference Handler”的,关注着这个pending list,如果看到有对象类型是Cleaner,就会执行它的clean(),其他类型就放入应用构造Reference时传入的ReferenceQueue中,这样应用的代码可以从Queue里拖出这些理论上已死的对象,做爱做的事情——这是一种比finalizer更轻量更好的机制。

cleaner
比如创建DirectByteBuffer,会新建Cleaner对象,该对象添加到Cleaner链表中。
对象被GC,如果是Cleaner对象,则会执行该对象的clean方法,
Clean方法会将对应的cleaner对象从链表中移除,同时会回收DirectByteBuffer申请的资源

看下ReferenceHandler源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/* High-priority thread to enqueue pending References
*/
private static class ReferenceHandler extends Thread {

ReferenceHandler(ThreadGroup g, String name) {
super(g, name);
}

public void run() {
for (;;) {
Reference<Object> r;
synchronized (lock) {
if (pending != null) {
r = pending;
pending = r.discovered;
r.discovered = null;
} else {
// The waiting on the lock may cause an OOME because it may try to allocate
// exception objects, so also catch OOME here to avoid silent exit of the
// reference handler thread.
//
// Explicitly define the order of the two exceptions we catch here
// when waiting for the lock.
//
// We do not want to try to potentially load the InterruptedException class
// (which would be done if this was its first use, and InterruptedException
// were checked first) in this situation.
//
// This may lead to the VM not ever trying to load the InterruptedException
// class again.
try {
try {
lock.wait();
} catch (OutOfMemoryError x) { }
} catch (InterruptedException x) { }
continue;
}
}

// Fast path for cleaners
if (r instanceof Cleaner) {
((Cleaner)r).clean();
continue;
}

ReferenceQueue<Object> q = r.queue;
if (q != ReferenceQueue.NULL) q.enqueue(r);
}
}
}

回顾下Finalize回收

sun不推荐实现finalize,实际上JDK内部很多类都实现了finalize。

finalize

如果对象实现了finalize,在对象初始化后,会封装成Finalizer对象添加到 Finalizer链表中。

对象被GC时,如果是Finalizer对象,会将对象赋值到pending对象。Reference Handler线程会将pending对象push到queue中。

Finalizer线程poll到对象,先删除掉Finalizer链表中对应的对象,然后再执行对象的finalize方法(一般为资源的销毁)

方案的缺点:

  1. 对象至少跨越2个GC,垃圾对象无法及时被GC掉,并且存在多次拷贝。影响YGC和FGC
  2. Finalizer线程优先级较低,会导致finalize方法延迟执行

开源堆外缓存框架

  • Ehcache 3.0:3.0基于其商业公司一个非开源的堆外组件的实现。
  • Chronical Map:OpenHFT包括很多类库,使用这些类库很少产生垃圾,并且应用程序使用这些类库后也很少发生Minor GC。类库主要包括:Chronicle Map,Chronicle Queue等等。
  • OHC:来源于Cassandra 3.0, Apache v2。
  • Ignite: 一个规模宏大的内存计算框架,属于Apache项目。

OHC

DirectByteBuffer是使用unsafe(JNI)申请堆外空间(unsafe.allocateMemory(size))。还有一种申请堆外空间的手段:JNA。

JNA的描述(https://github.com/java-native-access/jna)

JNA provides Java programs easy access to native shared libraries without writing anything but Java code - no JNI or native code is required

堆外缓存OHC便是使用JNA来申请堆外空间。

线下测试:JNA内存申请的性能是unsafe(JNI)的2倍。

Why not use ByteBuffer.allocateDirect()?

TL;DR allocating off-heap memory directly and bypassing ByteBuffer.allocateDirect is very gentle to the GC and we have explicit control over memory allocation and, more importantly, free. The stock implementation in Java frees off-heap memory during a garbage collection - also: if no more off-heap memory is available, it likely triggers a Full-GC, which is problematic if multiple threads run into that situation concurrently since it means lots of Full-GCs sequentially. Further, the stock implementation uses a global, synchronized linked list to track off-heap memory allocations.

This is why OHC allocates off-heap memory directly and recommends to preload jemalloc on Linux systems to improve memory managment performance.

这是OHC的wiki说明

其实OHC实现了JNI(malloc),JNA(jemalloc)两种方式,默认使用了JNA(jemalloc),性能的提升最关键的是malloc与jemalloc的区别了

ohc-allocator

在org.caffinitas.ohc.chunked.Uns类中,创建IAllocator类片段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static final String __ALLOCATOR = System.getProperty(OHCacheBuilder.SYSTEM_PROPERTY_PREFIX + "allocator");

IAllocator alloc;
String allocType = __ALLOCATOR != null ? __ALLOCATOR : "jna";
switch (allocType)
{
case "unsafe":
alloc = new UnsafeAllocator();
LOGGER.info("OHC using sun.misc.Unsafe memory allocation");
break;
case "jna":
default:
alloc = new JNANativeAllocator();
LOGGER.info("OHC using JNA OS native malloc/free");
}

allocator = alloc;
}

UnsafeAllocator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (sun.misc.Unsafe) field.get(null);


public long allocate(long size)
{
try
{
return unsafe.allocateMemory(size);
}
catch (OutOfMemoryError oom)
{
return 0L;
}
}

JNANativeAllocator

1
2
3
4
5
6
7
8
9
10
11
public long allocate(long size)
{
try
{
return Native.malloc(size);
}
catch (OutOfMemoryError oom)
{
return 0L;
}
}

其它

OHC这只是一个开端,只是分配内存部分,它还有淘汰策略等等,之后说缓存时,再谈了

参考资料

Netty之Java堆外内存扫盲贴

千丝万缕的FGC与Buffer pool

JVM源码分析之堆外内存完全解读

JVM源码分析之FinalReference完全解读

功夫在诗外

发表于 2017-12-17
字数统计: 1.6k 字数 | 阅读时长 ≈ 5 分钟

功夫在诗外

算是对今年,以及去年这两年的读书的一点思考,总结

为什么会有去年,因为今年读书过程中,让人不自然地想起了去年的类似内容。说的书类型不重复,但出现了很多相同的内容

可能与这些内容有缘分

迷思

每次跳槽的时候,都会去思考我的核心竞争力到底是什么?这个问题代表了两个对立的思想一方面代表了不自知,迷茫,对未来的焦虑;另一方面代表了自不知,三人有我师,前进的动力。

职场规划一片迷茫,股票所学也是一无长进,甚是苦恼。
想起了陆游的这句话,《汝果欲学诗,功夫在诗外》;所以去年开始读了很多与专业不相关的书籍。

名人传记,历史书籍,小说,这类书的价值在于引导成长;

心智,心理类也是所好,可惜所得甚少。一些书籍还得多读,多思

如《乌合之众》,知了很多年,却没读得很少,也没读懂。

路漫漫其修远兮

所得

在读《超级交易者》上半段时,突然有了一细细恍惚。好些内容都似曾相识,让人不得不想起一句老话,失败者的失败各有各的不同,而成功者的成功却具有惊人的相似之处。

责任

这个词就像是个空词,人人都会把这个词挂在嘴边,但何为责任,责任有何妙处

这在《人生五章》诗中也有体现

《人生五章》

1、

我走上街,

人行道上有一个深洞,

我掉了进去。

我迷失了……我绝望了。

这不是我的错,

费了好大的劲才爬出来。

2、

我走上同一条街。

人行道上有一个深洞,

我假装没看到,

还是掉了进去。

我不能相信我居然会掉在同样的地方。

但这不是我的错。

还是花了很长的时间才爬出来。

3、

我走上同一条街。

人行道上有一个深洞,

我看到它在那儿,

但还是掉了进去……

这是一种习惯。

我的眼睛张开着,

我知道我在那儿。

这是我的错。

我立刻爬了出来。

4、

我走上同一条街,

人行道上有一个深洞,

我绕道而过。

5、

我走上另一条街。

这首诗的内容在读《股票作手加快录》时,就有体会,大意就是一个人从意识到自身问题,到要改变,再到彻底改变的过程是相当艰辛的,习惯上升到性格再到内质的人性,是相当难以改变的。

这一切的改变,自省的起始就在于责任两字,无责任怎么会想到自身问题。

自己永无过错,一切都是别人的问题;代码没问题,是环境问题,自己买卖没问题,是庄家问题。

写作

为什么写作?其实有各种理由

  • 文字可以重塑人
  • 写作是与自己的对话,引人思考
  • 输出倒逼输入
  • 形成闭环,输入输出完整的IO系统
  • 写作是睡后收入

我算是写作的践行者,有什么意义呢?写作了很多年,有什么益处呢?

之前我是记录时间日志,是在看《时间就是朋友》这本书得来的。

后来李笑来又说要记注意力日志。还没有太多的实践,从时间开销日志来讲,只是发现了自我的浪费时间,这也算是所得之一吧。

从去年开始写博客,为什么要写呢?关键是“形成有效的输入输出系统,以输出倒逼输入”,“写作也记录下学习过程,防止狗咬尾巴”。

现在的博客目标是每篇文章写尽一个知识点,有广度,也得有深度。但任何知识都不是单点,‘台上一分钟,台下十年功’,‘冰山理论’。

晨写也没有以前的质量和数量了,上次交易玫瑰讲了量化:一是两张A4纸,二是得有三十分钟。

还有关键一点,我也疏忽的一点,没有及时回顾日记。需要温故知新。

冥想

这个之前实践了几天,后来也没有坚持。这以后还是得有空多练习,看看效果了

读书

很多人的理想生活就是 读万卷书,行万里路

我现在的读书方法,以及读书效果还是很差,不让人满意。关键是缺少总结,一本书,至少要自己组织语言总结其主旨。

不然读再多的书,还是一无所得,浪费时间,自我心理满足。不能学以致用。

上次读李笑来的财富自由之路,一个读书亮点:慢读。

现在很多的书籍都教人快速阅读以应对当代快速膨胀的知识,以快致胜。而李笑来提出,要慢读。

快速膨胀的是信息,而不是知识。知识的出现需要很久的时间积累。所以要慢读,要有深度,现在流行快餐文化,让世人缺少了深度思考的过程,以致人浮于思,任何事都没有深度,人云亦云。

导图

对于以上所得共同点,画了张思维导图更能一目了然

image

书单

这些书单是今年看过超出一半内容的书籍,有几本是年初读的,现在都已经忘记内容了。主要就是没有写一段自己理解的总结。这些书,明年还会再读,重读。吸取知识的能力实在是差。

《西藏生死书》

《刻意练习》

《简单思考》

《自卑与超越》

《超级交易员》

《我做散户这十年:三万赚到千万》

《走进我的交易室》

《炒股的智慧》

《红尘天幕》

今年没有达到目标,计划是一个月一本的,不找客观原因,明年还得继续努力。育儿方面的内容也得加强。

今年的博客也是打算一月一篇的,也没有正常发布。

计划是用来执行的。

微服务-监控

发表于 2017-10-15
字数统计: 1.9k 字数 | 阅读时长 ≈ 6 分钟

前言

这篇其实本来也打算放在《常识》系列中的,介绍一下分布式日志追踪系统,这在互联网界理论,技术,产品已经很成熟,国内外各大厂都有自己成熟的产品。是个不错的互联网门外汉科普知识点

微服务,已经火了多年,也已经落地实施。对服务的监控需求顺理成章。监控系统的本质其实也就是分布式日志追踪系统。就归类到《微服务》系列中吧

本篇大体内容

  1. 《微服务设计》第八章监控
  2. 监控理念Dapper
  3. 流行监控框架架构
  4. aspectj

《微服务》之监控

本来是说,要写个读书笔记的,但没有那么多完整的时间,正好学习监控,就把书拿出来,一并读了。理论结合实践,效果更好。

监控模型

三种监控模型

  1. 单一服务,单一服务器
  2. 单一服务,多个服务器
  3. 多个服务,多个服务器

单一服务单一服务器

  1. 主机状态

CPU、内存等,可以使用监控软件Nagios,Zabbix或者像New Relic这样的托管服务来帮助监控主机

  1. 服务状态

直接查看服务应用日志,或者web容器日志

单一服务多个服务器

  1. 主机状态

这种情况稍微复杂了一点,如前所述,如果我们想监控CPU,当CPU占用率过高时,如果这个问题发生在所有的服务器上,有可能是微服务本身的问题,但如果只发生在一台,则有可能是主机本身的问题。

我们需要关注每台服务器的日志数据,我们既想把数据聚合起来,又想深入分析每台主机,Nagios允许以这样的方式组织我们的主机。

  1. 服务状态

如果只有几个主机,可以用像ssh-multiplexers这样的工具,在多个主机上运行相同的命令。用一个大显示屏,运行grep “Error” app.log来定位错误。对于响应时间,可以在负载均衡器中跟踪,负载均衡器本身也需要跟踪。

多个服务多个服务器

这个情况就更复杂了,我们如何在多个主机上,成千上万行的日志中定位错误的原因?如果确定是一个服务器异常,还是一个系统性的问题?如何在多个主机跟踪一个错误的调用链,找出引起错误的原因?

答案是:从日志到应用程序指标,集中收集和聚合更可能多的数据

日志,更多的日志

需要将日志能够集中到一起方便使用

可以使用ELK

ELK由Elasticsearch、Logstash和Kibana三部分组件组成;

Elasticsearch是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。

Logstash是一个完全开源的工具,它可以对你的日志进行收集、分析,并将其存储供以后使用

kibana 是一个开源和免费的工具,它可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志

image

监控指标

系统指标:比如cpu 内存等,这些可以collectd进行收集

服务指标:比如接口调用次数,线程池空闲线程数等

语义指标:类似业务指标,比如订单量,活动用户数等

关联标识

image

其实这就是服务追踪,调用链监控

因为微服务化后,各种系统之间的调用关系很复杂,因此排查一个问题会比较难受,你不需要一个系统一个系统去找问题。所以服务追踪就变得非常关键。他能够追踪一次会话的所有调用,哪里有了问题,一目了然

这个更详细的后面介绍google的dapper

其它

标准化:将监控api标准化。
考虑受众:谁看?运营还是开发?
更加实时:监控应该具有实时性,出问题第一时间反应。
避免级联危险:可以使用hystrix。

小结

对每个服务:跟踪请求响应时间、错误率和应用程序级指标;跟踪所有下游服务的健康状态,如调用时间、错误率;标准化如何收集和存储指标;以标准格式讲日志记录到一个标准位置;监控底层操作系统。

对系统:聚合CPU等主机层级的指标和程序级指标;确保指标存储工具可以在系统和服务级别做聚合,也能查看单台主机信息;指标存储工具允许维护数据足够长时间,以了解趋势;使用单个可查询工具对日志进行聚合和存储;强烈考虑标准化关联标识的使用;了解什么样的情况需要行动,并构造警报和仪表盘;调查对各种指标聚合和统一化的可能性。

Google Dapper

image

分布式服务的跟踪系统需要记录在一次特定的请求后系统中完成的所有工作的信息。举个例子,图展现的是一个和5台服务器相关的一个服务,包括:前端(A),两个中间层(B和C),以及两个后端(D和E)。当一个用户(这个用例的发起人)发起一个请求时,首先到达前端,然后发送两个RPC到服务器B和C。B会马上做出反应,但是C需要和后端的D和E交互之后再返还给A,由A来响应最初的请求。对于这样一个请求,简单实用的分布式跟踪的实现,就是为服务器上每一次你发送和接收动作来收集跟踪标识符(message identifiers)和时间戳(timestamped events)。

google dapper译文:http://bigbully.github.io/Dapper-translation/

Dapper有三个设计目标:

  1. 低消耗:跟踪系统对在线服务的影响应该做到足够小。

  2. 应用级的透明:对于应用的程序员来说,是不需要知道有跟踪系统这回事的。如果一个跟踪系统想生效,就必须需要依赖应用的开发者主动配合,那么这个跟踪系统显然是侵入性太强的。

  3. 延展性:Google至少在未来几年的服务和集群的规模,监控系统都应该能完全把控住。

监控框架

大的互联网公司都有自己的分布式跟踪系统,
比如Twitter的zipkin,淘宝的鹰眼,新浪的Watchman,京东的Hydra等

这些系统大多是基于dapper论文而来。

image

aspectj

监控系统,又名日志追踪系统,那主要还是打印日志嘛。

无侵入性的日志打印,AOP绝对是上选了

写了几个aspectj小示例
https://github.com/zhuxingsheng/aspectjdemo

当然,aspectj只是埋点,后面还有日志存储,实时计算,日志分析,监控展示

参考

http://www.cnblogs.com/gudi/p/6683653.html

http://blog.csdn.net/guwei9111986/article/details/51798394

微服务监控案例之一

深入浅出spring事务

发表于 2017-07-11 | 分类于 源码解读
字数统计: 9.4k 字数 | 阅读时长 ≈ 37 分钟

前言

这篇其实也要归纳到《常识》系列中,但这重点又是spring的介绍,故归档在spring系列中。

工作很多年,除了学生时代学过,事务还真没有用过。过去开发游戏时,完全不用事务;现在互联网开发,也没有使用事务的场景,不要见怪。

概念

对于事务(Transaction)的概念,网上有各种版本,大同小异,

事务就是是由一系列对系统中数据进行读写的操作组成的一个程序执行单元,狭义上的事务特指数据库事务。

事务是一系列的动作,它们综合在一起才是一个完整的工作单元,这些动作必须全部完成,如果有一个失败的话,那么事务就会回滚到最开始的状态,仿佛什么都没发生过一样。

在企业级应用程序开发中,事务管理必不可少的技术,用来确保数据的完整性和一致性。

比如你去ATM机取1000块钱,大体有两个步骤:首先输入密码金额,银行卡扣掉1000元钱;然后ATM出1000元钱。这两个步骤必须是要么都执行要么都不执行。如果银行卡扣除了1000块但是ATM出钱失败的话,你将会损失1000元;如果银行卡扣钱失败但是ATM却出了1000块,那么银行将损失1000元。所以,如果一个步骤成功另一个步骤失败对双方都不是好事,如果不管哪一个步骤失败了以后,整个取钱过程都能回滚,也就是完全取消所有操作的话,这对双方都是极好的。

事务的特性

大名鼎鼎的ACID

  1. 原子性(Atomicity),事务必须是一个原子的操作序列单元,一次事务只允许存在两种状态,全部成功或全部失败,任何一个操作失败都将导致整个事务失败
  2. 一致性(Consistency),事务的执行不能破坏系统数据的完整性和一致性,如果未完成的事务对系统数据的修改有一部分已经写入物理数据库,这时系统数据就处于不一致状态
  3. 隔离性(Isolation),在并发环境中,不同的事务操作相同的数据时,虚相互隔离不能相互干扰
  4. 持久性(Durability),事务一旦提交,对系统数据的变更就应该是永久的,必须被永久保存下来,即使服务器宕机了,只要数据库能够重新启动,就一定能够恢复到事务成功结束时的状态

事务并发处理问题

如果没有锁定且多个用户同时访问一个数据库,则当他们的事务同时使用相同的数据时可能会发生问题。由于并发操作带来的数据不一致性包括:丢失数据修改、读”脏”数据(脏读)、不可重复读、产生幽灵数据:

假设数据库中有如下一张表:

image

第一类丢失更新(lost update)

回滚丢失

在完全未隔离事务的情况下,两个事物更新同一条数据资源,某一事物异常终止,回滚造成第一个完成的更新也同时丢失。
image

在T1时刻开启了事务1,T2时刻开启了事务2,

在T3时刻事务1从数据库中取出了id=”402881e535194b8f0135194b91310001”的数据,

T4时刻事务2取出了同一条数据,

T5时刻事务1将age字段值更新为30,

T6时刻事务2更新age为35并提交了数据,

但是T7事务1回滚了事务age最后的值依然为20,事务2的更新丢失了,

这种情况就叫做”第一类丢失更新(lost update)”。

脏读(dirty read)

事务没提交,提前读取

如果第二个事务查询到第一个事务还未提交的更新数据,形成脏读
image

在T1时刻开启了事务1,T2时刻开启了事务2,

在T3时刻事务1从数据库中取出了id=”402881e535194b8f0135194b91310001”的数据,

在T5时刻事务1将age的值更新为30,但是事务还未提交,

T6时刻事务2读取同一条记录,获得age的值为30,但是事务1还未提交,

若在T7时刻事务1回滚了事务2的数据就是错误的数据(脏数据),

这种情况叫做” 脏读(dirty read)”。

虚读(phantom read)

一个事务执行两次查询,第二次结果集包含第一次中没有或者某些行已被删除,造成两次结果不一致,只是另一个事务在这两次查询中间插入或者删除了数据造成的

image

在T1时刻开启了事务1,T2时刻开启了事务2,

T3时刻事务1从数据库中查询所有记录,记录总共有一条,

T4时刻事务2向数据库中插入一条记录,T6时刻事务2提交事务。

T7事务1再次查询数据数据时,记录变成两条了。

这种情况是”虚读(phantom read)”。

不可重复读(unrepeated read)

一个事务两次读取同一行数据,结果得到不同状态结果,如中间正好另一个事务更新了该数据,两次结果相异,不可信任
image

在T1时刻开启了事务1,T2时刻开启了事务2,

在T3时刻事务1从数据库中取出了id=”402881e535194b8f0135194b91310001”的数据,此时age=20,

T4时刻事务2查询同一条数据,

T5事务2更新数据age=30,T6时刻事务2提交事务,

T7事务1查询同一条数据,发现数据与第一次不一致。

这种情况就是”不可重复读(unrepeated read)”

第二类丢失更新(second lost updates)

覆盖丢失

不可重复读的特殊情况,如果两个事务都读取同一行,然后两个都进行写操作,并提交,第一个事务所做的改变就会丢失。

image

在T1时刻开启了事务1,T2时刻开启了事务2,

T3时刻事务1更新数据age=25,

T5时刻事务2更新数据age=30,

T6时刻提交事务,

T7时刻事务2提交事务,把事务1的更新覆盖了。

这种情况就是”第二类丢失更新(second lost updates)”。

并发问题总结

不可重复读的重点是修改 :

同样的条件 , 你读取过的数据 , 再次读取出来发现值不一样了

幻读的重点在于新增或者删除

同样的条件 , 第 1 次和第 2 次读出来的记录数不一样

第一类更新丢失(回滚丢失)

第二类更新丢失(覆盖丢失)

隔离级别

解决并发问题的途径是什么?答案是:采取有效的隔离机制。怎样实现事务的隔离呢?隔离机制的实现必须使用锁

一般在编程的时候只需要设置隔离等级

数据库系统提供四种事务隔离级别:

  1. 未提交读(READ UNCOMMITTED )

最低隔离级别,一个事务能读取到别的事务未提交的更新数据,很不安全,可能出现丢失更新、脏读、不可重复读、幻读;

  1. 提交读(READ COMMITTED)

一个事务能读取到别的事务提交的更新数据,不能看到未提交的更新数据,不会出现丢失更新、脏读,但可能出现不可重复读、幻读;

  1. 可重复读(REPEATABLE READ)

保证同一事务中先后执行的多次查询将返回同一结果,不受其他事务影响,不可能出现丢失更新、脏读、不可重复读,但可能出现幻读;

  1. 序列化(SERIALIZABLE)

最高隔离级别,不允许事务并发执行,而必须串行化执行,最安全,不可能出现更新、脏读、不可重复读、幻读,但是效率最低。

image

隔离级别越高,数据库事务并发执行性能越差,能处理的操作越少。
所以一般地,推荐使用REPEATABLE READ级别保证数据的读一致性。
对于幻读的问题,可以通过加锁来防止

image

MySQL支持这四种事务等级,默认事务隔离级别是REPEATABLE READ。

Oracle数据库支持READ COMMITTED 和 SERIALIZABLE这两种事务隔离级别,
所以Oracle数据库不支持脏读

Oracle数据库默认的事务隔离级别是READ COMMITTED

不可重复读和幻读的区别是,不可重复读对应的表的操作是更改(UPDATE),而幻读对应的表的操作是插入(INSERT),两种的应对策略不一样。对于不可重复读,只需要采用行级锁防止该记录被更新即可,而对于幻读必须加个表级锁,防止在表中插入数据

锁

乐观锁(Optimistic Lock)和悲观锁(Pessimistic Lock)

最重要的分类就是乐观锁(Optimistic Lock)和悲观锁(Pessimistic Lock),这实际上是两种锁策略

乐观锁,顾名思义就是非常乐观,非常相信真善美,每次去读数据都认为其它事务没有在写数据,所以就不上锁,快乐的读取数据,而只在提交数据的时候判断其它事务是否搞过这个数据了,如果搞过就rollback。乐观锁相当于一种检测冲突的手段,可通过为记录添加版本或添加时间戳来实现。

悲观锁,对其它事务抱有保守的态度,每次去读数据都认为其它事务想要作祟,所以每次读数据的时候都会上锁,直到取出数据。悲观锁大多数情况下依靠数据库的锁机制实现,以保证操作最大程度的独占性,但随之而来的是各种开销。悲观锁相当于一种避免冲突的手段。

选择标准:如果并发量不大,或数据冲突的后果不严重,则可以使用乐观锁;而如果并发量大或数据冲突后果比较严重(对用户不友好),那么就使用悲观锁。

分共享锁(S锁,Shared Lock)和排他锁(X锁,Exclusive Lock)

从读写角度,分共享锁(S锁,Shared Lock)和排他锁(X锁,Exclusive Lock),也叫读锁(Read Lock)和写锁(Write Lock)。
理解:

持有S锁的事务只读不可写。

如果事务A对数据D加上S锁后,其它事务只能对D加上S锁而不能加X锁。

持有X锁的事务可读可写。

如果事务A对数据D加上X锁后,其它事务不能再对D加锁,直到A对D的锁解除。

表级锁(Table Lock)和行级锁(Row Lock)

从锁的粒度角度,主要分为表级锁(Table Lock)和行级锁(Row Lock)。

表级锁将整个表加锁,性能开销最小

用户可以同时进行读操作。当一个用户对表进行写操作时,用户可以获得一个写锁,写锁禁止其他的用户读写操作。写锁比读锁的优先级更高,即使有读操作已排在队列中,一个被申请的写锁仍可以排在所队列的前列。

行级锁仅对指定的记录进行加锁

这样其它进程可以对同一个表中的其它记录进行读写操作。行级锁粒度最小,开销大,能够支持高并发,可能会出现死锁。

MySQL的MyISAM引擎使用表级锁,而InnoDB支持表级锁和行级锁,默认是行级锁。
还有BDB引擎使用页级锁,即一次锁定一组记录,并发性介于行级锁和表级锁之间。

三级锁协议

三级加锁协议是为了保证正确的事务并发操作,事务在读、写数据库对象是需要遵循的加锁规则。

一级封锁协议:事务T在修改数据R之前必须对它加X锁,直到事务结束方可释放。而若事务T只是读数据,不进行修改,则不需加锁,因此一级加锁协议下可能会出现脏读和不可重复读。

二级加锁协议:在一级加锁协议的基础上,加上这样一条规则——事务T在读取数据R之前必须对它加S锁,直到读取完毕以后释放。二级加锁协议下可能会出现不可重复读。

三级加锁协议:在一级加锁协议的基础上,加上这样一条规则——事务T在读取数据R之前必须对它加S锁,直到事务结束方可释放。三级加锁协议避免了脏读和不可重复读的问题

Spring事务

Spring事务管理的实现有许多细节,如果对整个接口框架有个大体了解会非常有利于我们理解事务

image

image

Spring事务管理器

Spring事务管理涉及的接口的联系如下:
image

Spring并不直接管理事务,而是提供了多种事务管理器,他们将事务管理的职责委托给Hibernate或者JTA等持久化机制所提供的相关平台框架的事务来实现。

Spring事务管理器的接口是org.springframework.transaction.PlatformTransactionManager,
通过这个接口,Spring为各个平台如JDBC、Hibernate等都提供了对应的事务管理器,

但是具体的实现就是各个平台自己的事情了

1
2
3
4
5
6
7
8
9
10
11
12
/**
* This is the central interface in Spring's transaction infrastructure.
* Applications can use this directly, but it is not primarily meant as API:
* Typically, applications will work with either TransactionTemplate or
* declarative transaction demarcation through AOP.
*/
public interface PlatformTransactionManager {
TransactionStatus getTransaction(TransactionDefinition definition)
throws TransactionException;
void commit(TransactionStatus status) throws TransactionException;
void rollback(TransactionStatus status) throws TransactionException;
}

标准的jdbc处理事务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
Connection conn = DataSourceUtils.getConnection();
//开启事务
conn.setAutoCommit(false);
try {
Object retVal = callback.doInConnection(conn);
conn.commit(); //提交事务
return retVal;
}catch (Exception e) {
conn.rollback();//回滚事务
throw e;
}finally {
conn.close();
}

spring对应的TranstactionTemplate处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class TransactionTemplate extends DefaultTransactionDefinition
implements TransactionOperations, InitializingBean {

@Override
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
}
else {
TransactionStatus status = this.transactionManager.getTransaction(this);
T result;
try {
result = action.doInTransaction(status);
}
catch (RuntimeException ex) {
// Transactional code threw application exception -> rollback
rollbackOnException(status, ex);
throw ex;
}
catch (Error err) {
// Transactional code threw error -> rollback
rollbackOnException(status, err);
throw err;
}
catch (Exception ex) {
// Transactional code threw unexpected exception -> rollback
rollbackOnException(status, ex);
throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
}
this.transactionManager.commit(status);
return result;
}
}

}

具体的事务管理机制对Spring来说是透明的,它并不关心那些,那些是对应各个平台需要关心的,所以Spring事务管理的一个优点就是为不同的事务API提供一致的编程模型,如JTA、JDBC、Hibernate、JPA。下面分别介绍各个平台框架实现事务管理的机制。

image

JDBC事务

如果应用程序中直接使用JDBC来进行持久化,DataSourceTransactionManager会为你处理事务边界。为了使用DataSourceTransactionManager,你需要使用如下的XML将其装配到应用程序的上下文定义中:

1
2
3
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>

实际上,DataSourceTransactionManager是通过调用java.sql.Connection来管理事务,而后者是通过DataSource获取到的。通过调用连接的commit()方法来提交事务,同样,事务失败则通过调用rollback()方法进行回滚。

Hibernate事务

如果应用程序的持久化是通过Hibernate实习的,那么你需要使用HibernateTransactionManager。对于Hibernate3,需要在Spring上下文定义中添加如下的声明:

1
2
3
<bean id="transactionManager" class="org.springframework.orm.hibernate3.HibernateTransactionManager">
<property name="sessionFactory" ref="sessionFactory" />
</bean>

sessionFactory属性需要装配一个Hibernate的session工厂,HibernateTransactionManager的实现细节是它将事务管理的职责委托给org.hibernate.Transaction对象,而后者是从Hibernate Session中获取到的。当事务成功完成时,HibernateTransactionManager将会调用Transaction对象的commit()方法,反之,将会调用rollback()方法。

事务属性

事务管理器接口PlatformTransactionManager通过getTransaction(TransactionDefinition definition)方法来得到事务,这个方法里面的参数是TransactionDefinition类,这个类就定义了一些基本的事务属性。

1
2
3
4
5
6
public interface TransactionDefinition {
int getPropagationBehavior(); // 返回事务的传播行为
int getIsolationLevel(); // 返回事务的隔离级别,事务管理器根据它来控制另外一个事务可以看到本事务内的哪些数据
int getTimeout(); // 返回事务必须在多少秒内完成
boolean isReadOnly(); // 事务是否只读,事务管理器能够根据这个返回值进行优化,确保事务是只读的
}

那么什么是事务属性呢?事务属性可以理解成事务的一些基本配置,描述了事务策略如何应用到方法上。事务属性包含了5个方面

image

传播行为

事务的第一个方面是传播行为(propagation behavior)。
当事务方法被另一个事务方法调用时,必须指定事务应该如何传播。

为什么需要定义传播?

在我们用SSH开发项目的时候,我们一般都是将事务设置在Service层 那么当我们调用Service层的一个方法的时候它能够保证我们的这个方法中执行的所有的对数据库的更新操作保持在一个事务中,在事务层里面调用的这些方法要么全部成功,要么全部失败。那么事务的传播特性也是从这里说起的。
如果你在你的Service层的这个方法中,除了调用了Dao层的方法之外,还调用了本类的其他的Service方法,那么在调用其他的Service方法的时候,这个事务是怎么规定的呢,我必须保证我在我方法里掉用的这个方法与我本身的方法处在同一个事务中,否则如果保证事物的一致性。事务的传播特性就是解决这个问题的,“事务是会传播的”在Spring中有针对传播特性的多种配置我们大多数情况下只用其中的一种:PROPGATION_REQUIRED:这个配置项的意思是说当我调用service层的方法的时候开启一个事务(具体调用那一层的方法开始创建事务,要看你的aop的配置),那么在调用这个service层里面的其他的方法的时候,如果当前方法产生了事务就用当前方法产生的事务,否则就创建一个新的事务。这个工作使由Spring来帮助我们完成的。
以前没有Spring帮助我们完成事务的时候我们必须自己手动的控制事务,例如当我们项目中仅仅使用hibernate,而没有集成进spring的时候,我们在一个service层中调用其他的业务逻辑方法,为了保证事物必须也要把当前的hibernate session传递到下一个方法中,或者采用ThreadLocal的方法,将session传递给下一个方法,其实都是一个目的。现在这个工作由spring来帮助我们完成,就可以让我们更加的专注于我们的业务逻辑。而不用去关心事务的问题。

Spring定义了七种传播行为:

PROPAGATION_REQUIRED

如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。这是最常见的选择。

PROPAGATION_SUPPORTS

支持当前事务,如果当前没有事务,就以非事务方式执行。

PROPAGATION_MANDATORY

使用当前的事务,如果当前没有事务,就抛出异常。

PROPAGATION_REQUIRES_NEW

新建事务,如果当前存在事务,把当前事务挂起。

PROPAGATION_NOT_SUPPORTED

以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。

PROPAGATION_NEVER

以非事务方式执行,如果当前存在事务,则抛出异常。

PROPAGATION_NESTED

如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。

传播行为详细

通过实例尝试一下各个传播属性

1
2
3
4
5
6
7
8
9
10
11
12
ServiceA {

void methodA() {
ServiceB.methodB();
}
}

ServiceB {

void methodB() {
}
}
PROPAGATION_REQUIRED

如果存在一个事务,则支持当前事务。如果没有事务则开启一个新的事务

1
2
3
4
5
6
7
8
9
10
//事务属性 PROPAGATION_REQUIRED
methodA{
……
methodB();
……
}
//事务属性 PROPAGATION_REQUIRED
methodB{
……
}

单独调用methodB方法:

1
2
3
main{ 
metodB();
}

相当于

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Main{ 
Connection con=null;
try{
con = getConnection();
con.setAutoCommit(false);

//方法调用
methodB();

//提交事务
con.commit();
} Catch(RuntimeException ex) {
//回滚事务
con.rollback();
} finally {
//释放资源
closeCon();
}
}

Spring保证在methodB方法中所有的调用都获得到一个相同的连接。在调用methodB时,没有一个存在的事务,所以获得一个新的连接,开启了一个新的事务。

只是在ServiceB.methodB内的任何地方出现异常,ServiceB.methodB将会被回滚,不会引起ServiceA.methodA的回滚

单独调用MethodA时,在MethodA内又会调用MethodB.

执行效果相当于:

1
2
3
4
5
6
7
8
9
10
11
12
main{ 
Connection con = null;
try{
con = getConnection();
methodA();
con.commit();
} catch(RuntimeException ex) {
con.rollback();
} finally {
closeCon();
}
}

调用MethodA时,环境中没有事务,所以开启一个新的事务.

当在MethodA中调用MethodB时,环境中已经有了一个事务,所以methodB就加入当前事务

ServiceA.methodA或者ServiceB.methodB无论哪个发生异常methodA和methodB作为一个整体都将一起回滚

PROPAGATION_SUPPORTS

如果存在一个事务,支持当前事务。如果没有事务,则非事务的执行。但是对于事务同步的事务管理器,PROPAGATION_SUPPORTS与不使用事务有少许不同

1
2
3
4
5
6
7
8
9
//事务属性 PROPAGATION_REQUIRED
methodA(){
methodB();
}

//事务属性 PROPAGATION_SUPPORTS
methodB(){
……
}

单纯的调用methodB时,methodB方法是非事务的执行的。当调用methdA时,methodB则加入了methodA的事务中,事务地执行

PROPAGATION_MANDATORY

如果已经存在一个事务,支持当前事务。如果没有一个活动的事务,则抛出异常

1
2
3
4
5
6
7
8
9
//事务属性 PROPAGATION_REQUIRED
methodA(){
methodB();
}

//事务属性 PROPAGATION_MANDATORY
methodB(){
……
}

当单独调用methodB时,因为当前没有一个活动的事务,则会抛出异常throw new IllegalTransactionStateException(“Transaction propagation ‘mandatory’ but no existing transaction found”);

当调用methodA时,methodB则加入到methodA的事务中,事务地执行

PROPAGATION_REQUIRES_NEW

总是开启一个新的事务。如果一个事务已经存在,则将这个存在的事务挂起。

1
2
3
4
5
6
7
8
9
10
11
//事务属性 PROPAGATION_REQUIRED
methodA(){
doSomeThingA();
methodB();
doSomeThingB();
}

//事务属性 PROPAGATION_REQUIRES_NEW
methodB(){
……
}

调用A方法:

1
2
3
main(){
methodA();
}

相当于

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
main(){
TransactionManager tm = null;
try{
//获得一个JTA事务管理器
tm = getTransactionManager();
tm.begin();//开启一个新的事务
Transaction ts1 = tm.getTransaction();
doSomeThing();
tm.suspend();//挂起当前事务
try{
tm.begin();//重新开启第二个事务
Transaction ts2 = tm.getTransaction();
methodB();
ts2.commit();//提交第二个事务
} Catch(RunTimeException ex) {
ts2.rollback();//回滚第二个事务
} finally {
//释放资源
}
//methodB执行完后,恢复第一个事务
tm.resume(ts1);
doSomeThingB();
ts1.commit();//提交第一个事务
} catch(RunTimeException ex) {
ts1.rollback();//回滚第一个事务
} finally {
//释放资源
}
}

在这里,我把ts1称为外层事务,ts2称为内层事务。从上面的代码可以看出,ts2与ts1是两个独立的事务,互不相干。

Ts2是否成功并不依赖于ts1

如果methodA方法在调用methodB方法后的doSomeThingB方法失败了,而methodB方法所做的结果依然被提交。

而除了 methodB之外的其它代码导致的结果却被回滚了

PROPAGATION_NOT_SUPPORTED

以非事务方式执行操作,如果当前存在事务,就把当前事务挂起,

1
2
3
4
5
6
7
8
9
10
11
//事务属性 PROPAGATION_REQUIRED
methodA(){
doSomeThingA();
methodB();
doSomeThingB();
}

//事务属性 PROPAGATION_NOT_SUPPORTED
methodB(){
……
}

当前不支持事务。比如ServiceA.methodA的事务级别是PROPAGATION_REQUIRED ,
而ServiceB.methodB的事务级别是PROPAGATION_NOT_SUPPORTED ,
那么当执行到ServiceB.methodB时,ServiceA.methodA的事务挂起,而他以非事务的状态运行完,再继续ServiceA.methodA的事务

PROPAGATION_NEVER

不能在事务中运行。假设ServiceA.methodA的事务级别是PROPAGATION_REQUIRED,
而ServiceB.methodB的事务级别是PROPAGATION_NEVER ,
那么ServiceB.methodB就要抛出异常了。

PROPAGATION_NESTED

开始一个 “嵌套的” 事务, 它是已经存在事务的一个真正的子事务. 潜套事务开始执行时, 它将取得一个 savepoint. 如果这个嵌套事务失败, 我们将回滚到此 savepoint. 潜套事务是外部事务的一部分, 只有外部事务结束后它才会被提交.

比如我们设计ServiceA.methodA的事务级别为PROPAGATION_REQUIRED,ServiceB.methodB的事务级别为PROPAGATION_NESTED,那么当执行到ServiceB.methodB的时候,ServiceA.methodA所在的事务就会挂起,ServiceB.methodB会起一个新的子事务并设置savepoint,等待ServiceB.methodB的事务完成以后,他才继续执行

因为ServiceB.methodB是外部事务的子事务,那么

  1. 如果ServiceB.methodB已经提交,那么ServiceA.methodA失败回滚,ServiceB.methodB也将回滚。
  2. 如果ServiceB.methodB失败回滚,如果他抛出的异常被ServiceA.methodA的try..catch捕获并处理,ServiceA.methodA事务仍然可能提交;如果他抛出的异常未被ServiceA.methodA捕获处理,ServiceA.methodA事务将回滚。

理解Nested的关键是savepoint。

与PROPAGATION_REQUIRES_NEW的区别:

  1. RequiresNew每次都创建新的独立的物理事务,而Nested只有一个物理事务;
  2. Nested嵌套事务回滚或提交不会导致外部事务回滚或提交,但外部事务回滚将导致嵌套事务回滚,而 RequiresNew由于都是全新的事务,所以之间是无关联的;
  3. Nested使用JDBC 3的保存点实现,即如果使用低版本驱动将导致不支持嵌套事务。
    使用嵌套事务,必须确保具体事务管理器实现的nestedTransactionAllowed属性为true,否则不支持嵌套事务,如DataSourceTransactionManager默认支持,而HibernateTransactionManager默认不支持,需要我们来开启。

在 spring 中使用 PROPAGATION_NESTED的前提:

  1. 我们要设置 transactionManager 的 nestedTransactionAllowed 属性为 true, 注意, 此属性默认为 false!!!
  2. java.sql.Savepoint 必须存在, 即 jdk 版本要 1.4+
  3. Connection.getMetaData().supportsSavepoints() 必须为 true, 即 jdbc drive 必须支持 JDBC 3.0

image

隔离规则

用来解决并发事务时出现的问题,其使用TransactionDefinition中的静态变量来指定

  1. ISOLATION_DEFAULT 使用后端数据库默认的隔离级别
  2. ISOLATION_READ_UNCOMMITTED 最低的隔离级别,允许读取尚未提交的数据变更,可能会导致脏读、幻读或不可重复读
  3. ISOLATION_READ_COMMITTED 允许读取并发事务已经提交的数据,可以阻止脏读,但是幻读或不可重复读仍有可能发生
  4. ISOLATION_REPEATABLE_READ 对同一字段的多次读取结果都是一致的,除非数据是被本身事务自己所修改,可以阻止脏读和不可重复读,但幻读仍有可能发生
  5. ISOLATION_SERIALIZABLE 最高的隔离级别,完全服从ACID的隔离级别,确保阻止脏读、不可重复读以及幻读,也是最慢的事务隔离级别,因为它通常是通过完全锁定事务相关的数据库表来实现的

可以使用DefaultTransactionDefinition类的setIsolationLevel(TransactionDefinition. ISOLATION_READ_COMMITTED)来指定隔离级别,其中此处表示隔离级别为提交读

也可以使用或setIsolationLevelName(“ISOLATION_READ_COMMITTED”)方式指定,其中参数就是隔离级别静态变量的名字,但不推荐这种方式

事务只读

将事务标识为只读,只读事务不修改任何数据;

对于JDBC只是简单的将连接设置为只读模式,对于更新将抛出异常;

对于一些其他ORM框架有一些优化作用,如在Hibernate中,Spring事务管理器将执行“session.setFlushMode(FlushMode.MANUAL)”
即指定Hibernate会话在只读事务模式下不用尝试检测和同步持久对象的状态的更新。

如果使用设置具体事务管理的validateExistingTransaction属性为true(默认false),将确保整个事务传播链都是只读或都不是只读
image

第二个addressService.save()不能设置成false

对于错误的事务只读设置将抛出IllegalTransactionStateException异常,并伴随“Participating transaction with definition [……] is not marked as read-only……”信息,表示参与的事务只读属性设置错误

事务超时

设置事务的超时时间,单位为秒,默认为-1表示使用底层事务的超时时间

使用如setTimeout(100)来设置超时时间,如果事务超时将抛出org.springframework.transaction.TransactionTimedOutException异常并将当前事务标记为应该回滚,即超时后事务被自动回滚

可以使用具体事务管理器实现的defaultTimeout属性设置默认的事务超时时间,如DataSourceTransactionManager. setDefaultTimeout(10)

回滚规则

spring事务管理器会捕捉任何未处理的异常,然后依据规则决定是否回滚抛出异常的事务

默认配置下,Spring只有在抛出的异常为运行时unchecked异常时才回滚该事务,也就是抛出的异常为RuntimeException的子类(Errors也会导致事务回滚),而抛出checked异常则不会导致事务回滚。可以明确的配置在抛出那些异常时回滚事务,包括checked异常。也可以明确定义那些异常抛出时不回滚事务

如何改变默认规则:

  1. 让checked例外也回滚:在整个方法前加上 @Transactional(rollbackFor=Exception.class)
  2. 让unchecked例外不回滚: @Transactional(notRollbackFor=RunTimeException.class)
  3. 不需要事务管理的(只查询的)方法:@Transactional(propagation=Propagation.NOT_SUPPORTED)

事务状态

上面讲到的调用PlatformTransactionManager接口的getTransaction()的方法得到的是TransactionStatus接口的一个实现,这个接口的内容如下:

1
2
3
4
5
6
7
public interface TransactionStatus{
boolean isNewTransaction(); // 是否是新的事物
boolean hasSavepoint(); // 是否有恢复点
void setRollbackOnly(); // 设置为只回滚
boolean isRollbackOnly(); // 是否为只回滚
boolean isCompleted; // 是否已完成
}

可以发现这个接口描述的是一些处理事务提供简单的控制事务执行和查询事务状态的方法,在回滚或提交的时候需要应用对应的事务状态

编程式和声明式事务

Spring提供了对编程式事务和声明式事务的支持,编程式事务允许用户在代码中精确定义事务的边界

而声明式事务(基于AOP)有助于用户将操作与事务规则进行解耦。

简单地说,编程式事务侵入到了业务代码里面,但是提供了更加详细的事务管理;而声明式事务由于基于AOP,所以既能起到事务管理的作用,又可以不影响业务代码的具体实现。

编程式

Spring提供两种方式的编程式事务管理,分别是:使用TransactionTemplate和直接使用PlatformTransactionManager

使用TransactionTemplate

采用TransactionTemplate和采用其他Spring模板,如JdbcTempalte和HibernateTemplate是一样的方法。它使用回调方法,把应用程序从处理取得和释放资源中解脱出来。如同其他模板,TransactionTemplate是线程安全的。代码片段:

1
2
3
4
5
6
7
8
TransactionTemplate tt = new TransactionTemplate(); // 新建一个TransactionTemplate
Object result = tt.execute(
new TransactionCallback(){
public Object doTransaction(TransactionStatus status){
updateOperation();
return resultOfUpdateOperation();
}
}); // 执行execute方法进行事务管理

使用TransactionCallback()可以返回一个值。如果使用TransactionCallbackWithoutResult则没有返回值

使用PlatformTransactionManager

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
//定义一个某个框架平台的TransactionManager,如JDBC、Hibernate
dataSourceTransactionManager.setDataSource(this.getJdbcTemplate().getDataSource()); // 设置数据源
DefaultTransactionDefinition transDef = new DefaultTransactionDefinition(); // 定义事务属性
transDef.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRED); // 设置传播行为属性
TransactionStatus status = dataSourceTransactionManager.getTransaction(transDef); // 获得事务状态
try {
// 数据库操作
dataSourceTransactionManager.commit(status);// 提交
} catch (Exception e) {
dataSourceTransactionManager.rollback(status);// 回滚
}

声明式

有几种实现方式,不一一罗列了

使用tx拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!-- 定义事务管理器(声明式的事务) --> 
<bean id="transactionManager"
class="org.springframework.orm.hibernate3.HibernateTransactionManager">
<property name="sessionFactory" ref="sessionFactory" />
</bean>

<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="*" propagation="REQUIRED" />
</tx:attributes>
</tx:advice>

<aop:config>
<aop:pointcut id="interceptorPointCuts"
expression="execution(* com.bluesky.spring.dao.*.*(..))" />
<aop:advisor advice-ref="txAdvice"
pointcut-ref="interceptorPointCuts" />
</aop:config>

全注解

1
2
3
4
5
6
7
<tx:annotation-driven transaction-manager="transactionManager" />
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager ">
<property name="dataSource">
<ref bean="basicDataSource" />
</property>
</bean>

Spring源码片段

在《BeanPostProcessor学习》中提到了AOP的实现方式,声明式事务实现是基于AOP

首先得解析xml配置,TxNamespaceHandler

1
2
3
4
5
6
@Override
public void init() {
registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}

主要是TransactionInterceptor类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Object invoke(final MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
//主要逻辑在父类
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}

核心逻辑,还得看父类TransactionAspectSupport#invokeWithinTransaction

逻辑主干很清晰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 判断创建Transaction
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
//执行业务逻辑
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 出现异常,回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//清除当前事务状态
cleanupTransactionInfo(txInfo);
}
//提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}

创建事务createTransactionIfNecessary

主要逻辑在PlatformTransactionManager#getTransaction()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
//得到各个不同数据源的事务对象,spring尽然没有把transaction对象抽象出来,很是奇怪
Object transaction = doGetTransaction();

// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();

if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}

//此事务是否已经存在
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}

// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}

// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
//这三种都是新建事务
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//开始获取链接,开启事务,绑定资源到当前线程
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}

TransactionStatus

这儿返回的是TransactionStatus

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface TransactionStatus extends SavepointManager, Flushable {

boolean isNewTransaction();

boolean hasSavepoint();

void setRollbackOnly();

boolean isRollbackOnly();

void flush();

boolean isCompleted();

TransactionInfo

事务信息

1
2
3
4
5
6
7
8
9
10
11
12
protected final class TransactionInfo {

private final PlatformTransactionManager transactionManager;

private final TransactionAttribute transactionAttribute;

private final String joinpointIdentification;

private TransactionStatus transactionStatus;

private TransactionInfo oldTransactionInfo;
}

commitTransactionAfterReturning提交事务

逻辑到了AbstractPlatformTransactionManager#processRollback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;

try {
triggerBeforeCompletion(status);
//有savepoint,
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
//回滚事务
doRollback(status);
}
else {
// Participating in larger transaction
//在一个事务中,就先设置回滚标识,等父事务一起回滚
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}

triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}

completeTransactionAfterThrowing回滚事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
//有事务才能回滚
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
//回滚在 (ex instanceof RuntimeException || ex instanceof Error)
if (txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
catch (Error err) {
logger.error("Application exception overridden by rollback error", ex);
throw err;
}
}
else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
catch (Error err) {
logger.error("Application exception overridden by commit error", ex);
throw err;
}
}
}
}

总结

一个完整的事务介绍结束了。框架就是封装一切,透明一切,简化一切。本质的流程不会变

参考资料

http://blog.sina.com.cn/s/blog_7ed8b1e90101mgas.html

https://my.oschina.net/wanyuxiang000/blog/277568

http://www.mamicode.com/info-detail-1248286.html

http://jinnianshilongnian.iteye.com/blog/1496953

http://www.cnblogs.com/yldIndex/p/spring_Transactional.html

http://jinnianshilongnian.iteye.com/blog/1441271

http://www.cnblogs.com/chihirotan/p/6739748.html

计数器算法

发表于 2017-05-29
字数统计: 3.3k 字数 | 阅读时长 ≈ 12 分钟

《微服务-熔断机制》中提到了计数器,这篇详细学习一下计数器算法

之前的有次面试,碰到了计数器的的题目

Q:线上服务,设计一个拦截器,一个IP如果短时间内请求次数过多,就屏蔽

A:使用map,key为ip,值为次数与时间

Q:请求相当大,会直接冲垮内存,怎么办?

A:使用redis,像redis cluster,绝对可以满足

Q: 写下伪代码

A:bbbbbbb

其实计数器在互联网开发中很常见,当时刚转互联网比较无知,面试得很烂。

计数器法

计数器法是限流算法里最简单也是最容易实现的一种算法。比如我们规定,对于A接口来说,我们1分钟的访问次数不能超过100个。那么我们可以这么做:在一开 始的时候,我们可以设置一个计数器counter,每当一个请求过来的时候,counter就加1,如果counter的值大于100并且该请求与第一个 请求的间隔时间还在1分钟之内,那么说明请求数过多;如果该请求与第一个请求的间隔时间大于1分钟,且counter的值还在限流范围内,那么就重置 counter,具体算法的示意图如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Counter {
public long timeStamp = getNowTime();
public int reqCount = 0;
public final int limit = 100; // 时间窗口内最大请求数
public final long interval = 1000; // 时间窗口ms
public boolean grant() {
long now = getNowTime();
if (now < timeStamp + interval) {
// 在时间窗口内
reqCount++;
// 判断当前时间窗口内是否超过最大请求控制数
return reqCount <= limit;
}
else {
timeStamp = now;
// 超时后重置
reqCount = 1;
return true;
}
}
}

以上是示意代码,先忽视其中的并发问题,最大的问题是临界问题

从上图中我们可以看到,假设有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。我们刚才规定的是1分钟最多100个请求,也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求, 可以瞬间超过我们的速率限制。用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。

滑动窗口

滑动窗口,又称rolling window

在上图中,整个红色的矩形框表示一个时间窗口,在我们的例子中,一个时间窗口就是一分钟。然后我们将时间窗口进行划分,比如图中,我们就将滑动窗口 划成了6格,所以每格代表的是10秒钟。每过10秒钟,我们的时间窗口就会往右滑动一格。每一个格子都有自己独立的计数器counter,比如当一个请求 在0:35秒的时候到达,那么0:30~0:39对应的counter就会加1。

那么滑动窗口怎么解决刚才的临界问题的呢?我们可以看上图,0:59到达的100个请求会落在灰色的格子中,而1:00到达的请求会落在橘黄色的格 子中。当时间到达1:00时,我们的窗口会往右移动一格,那么此时时间窗口内的总请求数量一共是200个,超过了限定的100个,所以此时能够检测出来触 发了限流。

我再来回顾一下刚才的计数器算法,我们可以发现,计数器算法其实就是滑动窗口算法。只是它没有对时间窗口做进一步地划分,所以只有1格。

由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

实现一

根据上面滑动窗口的定义,实现很简单了

  1. 需要一个map,当然是并发安全的,key为时间
  2. 统计窗口内的请求总数

这儿有个以这种方式实现的
https://github.com/zhuxingsheng/yammer-metrics/blob/master/metrics-core/src/main/java/com/codahale/metrics/SlidingTimeWindowReservoir.java

摘点核心的片段:

1
2
//一个并发安全map,skip list有序
this.measurements = new ConcurrentSkipListMap<Long, Long>();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 获取map的key,以时间纳秒值为key
* @return
*/
private long getTick() {
for (; ; ) {
final long oldTick = lastTick.get();
//每纳秒处理的请求很多,减少compareAndSet的失败次数,这儿*COLLISION_BUFFER
final long tick = clock.getTick() * COLLISION_BUFFER;
// ensure the tick is strictly incrementing even if there are duplicate ticks
final long newTick = tick > oldTick ? tick : oldTick + 1;
if (lastTick.compareAndSet(oldTick, newTick)) {
return newTick;
}
}
}
1
2
3
4
private void trim() {
//清除window之前的计数
measurements.headMap(getTick() - window).clear();
}
缺点

实现简单,但有个问题,map的key是在不停的增加,删除,给GC带来了压力

实现二

考虑key的复用,使用环形结构

环形结构

通过取模来达到这个效果

初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private int window; //计算窗口
//整个循环数组窗口
private int ringWindow=window+30;


requestCounter = new AtomicInteger[ringWindow];
failedCounter = new AtomicInteger[ringWindow];
for (int i = 0; i < ringWindow; i++) {
requestCounter[i] = new AtomicInteger(0);
failedCounter[i] = new AtomicInteger(0);
}

initCounterTimeInSecond =
TimeUnit.NANOSECONDS.toSeconds(System.nanoTime());

计算窗口内的次数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private long countTotal(AtomicInteger[] caculateCounter){
int currentIndex = getIndex();

long sum = 0;

for (int i = 0; i < window; i++) {
//这儿并不是直接计算window中的所有counter,
//而是从currentIndex向前倒取window个
int index = ((currentIndex + ringWindow) -i)
% this.ringWindow;
sum += caculateCounter[index].get();
}
return sum;
}

为什么需要ringWindow,直接window就可以?这儿有个奇技淫巧。

1
2
3
4
5
6
7
8
9
10
11
CLEAN_UP_BUFFER=10;

public void cleanupFutureCounter() {
int currentIndex = getIndex();

for (int i = 1 ; i <= CLEAN_UP_BUFFER; i++) {
int index = (currentIndex + i) % this.ringWindow;
requestCounter[index].set(0);
failedCounter[index].set(0);
}
}

这儿会有个定时任务,每5秒会去清空未来10秒的数据

因为在一环数组全部填充完成后,下一轮开始时,需要清空,哪个地方是起点,无法区分,所以ringwindow预留点位置用来清空

实现三

还有一些是加锁,当然会是轻量的CAS;每一个轮回完成后,都需要标记开始位置,并清空环。

漏桶算法

漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率.示意图如下:

我们将算法中的水换成实际应用中的请求,我们可以看到漏桶算法天生就限制了请求的速度。当使用了漏桶算法,我们可以保证接口会以一个常速速率来处理请求。所以漏桶算法天生不会出现临界问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class LeakyDemo {
public long timeStamp = getNowTime();
public int capacity; // 桶的容量
public int rate; // 水漏出的速度
public int water; // 当前水量(当前累积请求数)
public boolean grant() {
long now = getNowTime();
water = max(0, water - (now - timeStamp) * rate); // 先执行漏水,计算剩余水量
timeStamp = now;
if ((water + 1) < capacity) {
// 尝试加水,并且水还未满
water += 1;
return true;
}
else {
// 水满,拒绝加水
return false;
}
}
}

令牌桶算法(Token Bucket)

令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务

令牌桶算法

令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TokenBucketDemo {
public long timeStamp = getNowTime();
public int capacity; // 桶的容量
public int rate; // 令牌放入速度
public int tokens; // 当前令牌数量
public boolean grant() {
long now = getNowTime();
// 先添加令牌
tokens = min(capacity, tokens + (now - timeStamp) * rate);
timeStamp = now;
if (tokens < 1) {
// 若不到1个令牌,则拒绝
return false;
}
else {
// 还有令牌,领取令牌
tokens -= 1;
return true;
}
}
}

临界问题

我 们再来考虑一下临界问题的场景。在0:59秒的时候,由于桶内积满了100个token,所以这100个请求可以瞬间通过。但是由于token是以较低的 速率填充的,所以在1:00的时候,桶内的token数量不可能达到100个,那么此时不可能再有100个请求通过。所以令牌桶算法可以很好地解决临界问 题。

下图比较了计数器(左)和令牌桶算法(右)在临界点的速率变化。我们可以看到虽然令牌桶算法允许突发速率,但是下一个突发速率必须要等桶内有足够的 token后才能发生:

Guava RateLimiter

在guava中,有现成的实现

RateLimiter使用的是一种叫令牌桶的流控算法,RateLimiter会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,比如你希望自己的应用程序QPS不要超过1000,那么RateLimiter设置1000的速率后,就会每秒往桶里扔1000个令牌。

有两种方式,SmoothBursty和SmoothWarmingUp

1
2
3
4
5
6
7
8
9
create(double permitsPerSecond)
根据指定的稳定吞吐率创建RateLimiter
这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询)

create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
根据指定的稳定吞吐率和预热期来创建RateLimiter
这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),
在这段预热时间内,
RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和)

SmoothBursty通过平均速率和最后一次新增令牌的时间计算出下次新增令牌的时间的,另外需要一个桶暂存一段时间内没有使用的令牌(即可以突发的令牌数)。另外RateLimiter还提供了tryAcquire方法来进行无阻塞或可超时的令牌消费。

因为SmoothBursty允许一定程度的突发,会有人担心如果允许这种突发,假设突然间来了很大的流量,那么系统很可能扛不住这种突发。因此需要SmoothWarmingUp一种平滑速率的限流工具,从而系统冷启动后慢慢的趋于平均固定速率(即刚开始速率小一些,然后慢慢趋于我们设置的固定速率)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void testSmoothWarmingUp(){
//每秒5个,1500ms后达到正常速率
RateLimiter rateLimiter = RateLimiter.create(5,1500, TimeUnit.MILLISECONDS);

List<Runnable> tasks = new ArrayList<Runnable>();
for (int i = 0; i < 10; i++) {
tasks.add(new Request(i));
}

ExecutorService executorService = Executors.newCachedThreadPool();
for(Runnable task:tasks) {
System.out.println("等待时间:" + rateLimiter.acquire());
executorService.execute(task);
}
executorService.shutdown();

}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
等待时间:0.0
0 handle request 1528693920502
等待时间:0.54311
1 handle request 1528693921052
等待时间:0.433531
2 handle request 1528693921486
等待时间:0.332679
3 handle request 1528693921819
等待时间:0.229785
4 handle request 1528693922049
等待时间:0.199668
5 handle request 1528693922249
等待时间:0.199845
6 handle request 1528693922449
等待时间:0.199757
7 handle request 1528693922649
等待时间:0.19981
8 handle request 1528693922849
等待时间:0.199732
9 handle request 1528693923049

可以看出前面几个等待时间长,速度慢;1500ms后,速率达到正常的每秒5的速度

其实这算是一种漏桶算法的变异,在令牌桶中控制一个移除令牌的速度就是个漏桶了。

总结

计数器 VS 滑动窗口

计数器算法是最简单的算法,可以看成是滑动窗口的低精度实现。滑动窗口由于需要存储多份的计数器(每一个格子存一份),所以滑动窗口在实现上需要更多的存储空间。也就是说,如果滑动窗口的精度越高,需要的存储空间就越大。

漏桶算法 VS 令牌桶算法

漏桶算法和令牌桶算法最明显的区别是令牌桶算法允许流量一定程度的突发。因为默认的令牌桶算法,取走token是不需要耗费时间的,也就是说,假设桶内有100个token时,那么可以瞬间允许100个请求通过。

令牌桶算法由于实现简单,且允许某些流量的突发,对用户友好,所以被业界采用地较多。当然我们需要具体情况具体分析,只有最合适的算法,没有最优的算法

参考

接口限流算法总结

Hashmap源码解析

发表于 2017-04-06 | 分类于 java
字数统计: 4.6k 字数 | 阅读时长 ≈ 19 分钟

前言

做什么都怕进入狗咬尾巴的怪圈,上次看hashmap源码还是2012年,这次出去面试时被问到了hashmap的问题,整体思路还是记得的,巴拉巴拉一堆。回来再看一下源码,温习一下

想要了解hashmap,就得先知道一下他的数据结构理论

哈希数据结构

哈希表(Hash table,也叫散列表),是根据key而直接进行访问的数据结构。也就是说,它通过把key映射到表中一个位置来访问记录,以加快查找的速度。这个映射函数叫做散列函数,存放记录的数组叫做散列表。

哈希表的做法其实很简单,就是把key通过一个固定的算法函数即所谓的哈希函数转换成一个整型数字,然后就将该数字对数组长度进行取余,取余结果就当作数组的下标,将value存储在以该数字为下标的数组空间里。

哈希表是一个在时间和空间上做出权衡的经典例子。如果没有内存限制,那么可以直接将键作为数组的索引。那么所有的查找时间复杂度为O(1);如果没有时间限制,那么我们可以使用无序数组并进行顺序查找,这样只需要很少的内存。哈希表使用了适度的时间和空间来在这两个极端之间找到了平衡。只需要调整哈希函数算法即可在时间和空间上做出取舍。

哈希函数

哈希查找第一步就是使用哈希函数将键映射成索引。这种映射函数就是哈希函数。如果我们有一个保存0-M数组,那么我们就需要一个能够将任意键转换为该数组范围内的索引(0~M-1)的哈希函数

R.W.Floyed给出的衡量散列思想的三个标准:

  1. 一个好的hash算法的计算应该是非常快的
  2. 一个好的hash算法应该是冲突极小化, 如果存在冲突,应该是冲突均匀化。

哈希冲突

通过哈希函数,我们可以将键转换为数组的索引(0-M-1),但是对于两个或者多个键具有相同索引值的情况,我们需要有一种方法来处理这种冲突。

拉链法

一种比较直接的办法就是,将大小为M 的数组的每一个元素指向一个条链表,链表中的每一个节点都存储散列值为该索引的键值对,这就是拉链法

image

该方法的基本思想就是选择足够大的M,使得所有的链表都尽可能的短小,以保证查找的效率。对采用拉链法的哈希实现的查找分为两步,首先是根据散列值找到等一应的链表,然后沿着链表顺序找到相应的键。

实现基于拉链表的散列表,目标是选择适当的数组大小M,使得既不会因为空链表而浪费内存空间,也不会因为链表太而在查找上浪费太多时间。拉链表的优点在于,这种数组大小M的选择不是关键性的,如果存入的键多于预期,那么查找的时间只会比选择更大的数组稍长,另外,我们也可以使用更高效的结构来代替链表存储。如果存入的键少于预期,索然有些浪费空间,但是查找速度就会很快。所以当内存不紧张时,我们可以选择足够大的M,可以使得查找时间变为常数,如果内存紧张时,选择尽量大的M仍能够将性能提高M倍。

线性探测法

线性探测法是开放寻址法解决哈希冲突的一种方法,基本原理为,使用大小为M的数组来保存N个键值对,其中M>N,我们需要使用数组中的空位解决碰撞冲突。如下图所示:

image

对照前面的拉链法,在该图中,”Ted Baker” 是有唯一的哈希值153的,但是由于153被”Sandra Dee”占用了。而原先”Snadra Dee”和”John Smith”的哈希值都是152的,但是在对”Sandra Dee”进行哈希的时候发现152已经被占用了,所以往下找发现153没有被占用,所以存放在153上,然后”Ted Baker”哈希到153上,发现已经被占用了,所以往下找,发现154没有被占用,所以值存到了154上。

开放寻址法中最简单的是线性探测法:当碰撞发生时即一个键的散列值被另外一个键占用时,直接检查散列表中的下一个位置即将索引值加1,这样的线性探测会出现三种结果:

  1. 命中,该位置的键和被查找的键相同
  2. 未命中,键为空
  3. 继续查找,该位置和键被查找的键不同。

线性探查(Linear Probing)方式虽然简单,但是有一些问题,它会导致同类哈希的聚集。在存入的时候存在冲突,在查找的时候冲突依然存在。

hashmap 源码分析

hashmap原理角度看就是基于哈希数据结构的一种实现,解决冲突的办法是使用拉链法

从源码基于JDK1.7看

1
2
3
public class HashMap<K,V>
extends AbstractMap<K,V>
implements Map<K,V>, Cloneable, Serializable

HashMap是基于哈希表的Map接口的非同步实现,它提供所有可选的映射操作,并允许使用null键和null值。
此集合不保证映射的顺序,特别不保证其顺序永久不变。

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);

this.loadFactor = loadFactor;
threshold = initialCapacity;
init();
}

两个重要的初始化参数,[初始容量,负载因子]

这个JDK7大版本里面的小版本对hashmap改动好好几次。

这个源码是基于JDK7.0_80的。有些版本在构造函数里面就确保容量是2的N次方,但这个版本没有,很简单的赋值

初始容量16,负载因子0.75

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* The default initial capacity - MUST be a power of two.
*/
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16

/**
* The maximum capacity, used if a higher value is implicitly specified
* by either of the constructors with arguments.
* MUST be a power of two <= 1<<30.
*/
static final int MAXIMUM_CAPACITY = 1 << 30;

/**
* The load factor used when none specified in constructor.
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;

2的N次方

很多算法书上都认为想要降低key冲突,最好容量为素数。为什么java却取了个2的N次方呢。

为了将各元素的hashCode保存至长度为Length的key数组中,一般采用取模的方式,即index = hashCode % Length。不可避免的,存在多个不同对象的hashCode被安排在同一位置,这就是我们平时所谓的“冲突”。
如果仅仅是考虑元素均匀化与冲突极小化,似乎应该将Length取为素数(尽管没有明显的理论来支持这一点,但数学家们通过大量的实践得出结论,对素数取模的产生结果的无关性要大于其它数字)。为此,Craig Larman and Rhett Guthrie《Java Performence》中对此也大加抨击。

为了弄清楚这个问题,Bruce Eckel(Thinking in JAVA的作者)专程采访了java.util.hashMap的作者Joshua Bloch,此设计的原因
取模运算在包括Java在内的大多数语言中的效率都十分低下,而当除数为2的N次方时,取模运算将退化为最简单的位运算,其效率明显提升(按照Bruce Eckel给出的数据,大约可以提升5~8倍)

1
2
3
4
5
6
7
/**
* Returns index for hash code h.
*/
static int indexFor(int h, int length) {
// assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";
return h & (length-1);
}

看下图,左边两组是数组长度为16(2的4次方),右边两组是数组长度为15。两组的hashcode为8和9,但是很明显,当它们和1110“与”的时候,产生了相同的结果,也就是说它们会定位到数组中的同一个位置上去,这就产生了碰撞,8和9会被放到同一个链表上,那么查询的时候就需要遍历这个链表,得到8或者9,这样就降低了查询的效率。同时,我们也可以发现,当数组长度为15的时候,hashcode的值会与14(1110)进行“与”,那么最后一位永远是0,而0001,0011,0101,1001,1011,0111,1101这几个位置永远都不能存放元素了,空间浪费相当大,更糟的是这种情况中,数组可以使用的位置比数组长度小了很多,这意味着进一步增加了碰撞的几率,减慢了查询的效率!

image

put操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Associates the specified value with the specified key in this map.
* If the map previously contained a mapping for the key, the old
* value is replaced.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>.
* (A <tt>null</tt> return can also indicate that the map
* previously associated <tt>null</tt> with <tt>key</tt>.)
*/
public V put(K key, V value) {
if (table == EMPTY_TABLE) {
inflateTable(threshold);
}
if (key == null)
return putForNullKey(value);
int hash = hash(key);
int i = indexFor(hash, table.length);
for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}

modCount++;
addEntry(hash, key, value, i);
return null;
}

当table == EMPTY_TABLE时,先填充table

1
2
3
4
5
6
7
8
9
10
11
/**
* Inflates the table.
*/
private void inflateTable(int toSize) {
// Find a power of 2 >= toSize
int capacity = roundUpToPowerOf2(toSize);

threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
table = new Entry[capacity];
initHashSeedAsNeeded(capacity);
}

这儿看到了2的N次方,没有跟之前一样使用while循环

1
2
3
int capacity = 1;  
while (capacity < initialCapacity)
capacity <<= 1;

threshold的值=容量*负载因子

看下roundUpToPowerOf2()

1
2
3
4
5
6
private static int roundUpToPowerOf2(int number) {
// assert number >= 0 : "number must be non-negative";
return number >= MAXIMUM_CAPACITY
? MAXIMUM_CAPACITY
: (number > 1) ? Integer.highestOneBit((number - 1) << 1) : 1;
}

主要是Integer.highestOneBit

1
2
3
4
5
6
7
8
9
public static int highestOneBit(int i) {
// HD, Figure 3-1
i |= (i >> 1);
i |= (i >> 2);
i |= (i >> 4);
i |= (i >> 8);
i |= (i >> 16);
return i - (i >>> 1);
}

这个方法单纯的使用位运算,性能自然提高
解析下这个方法,这个方法的作用是求构成一个整数的最大的位所代表的整数的值
接下来举个简单的例子,128来讲二进制是1000 0000。下面以他为例子算下:
移1位
1000 0000
0100 0000
|————-
移2位
1100 0000
0011 0000
|————
移4位
1111 0000
0000 1111
|————
移8位
1111 1111
0000 0000
|————
移动16位
1111 1111
0000 0000
|————
1111 1111
最终的结果如你所看到的,后面的位全部填充为1,把后面的位全部减掉就得到了最高的位代表的整数。

回到put方法,如果key==null,putForNullKey(value);说明hashmap支持key为null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Offloaded version of put for null keys
*/
private V putForNullKey(V value) {
for (Entry<K,V> e = table[0]; e != null; e = e.next) {
if (e.key == null) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
modCount++;
addEntry(0, null, value, 0);
return null;
}

这个逻辑比较简单

  1. 第一步,如果找到之前有key为null的key,进行一下value替换,返回oldValue
  2. 如果没有为null的key,那进行addEntry() 添加元素的主要方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    /**
    * Adds a new entry with the specified key, value and hash code to
    * the specified bucket. It is the responsibility of this
    * method to resize the table if appropriate.
    *
    * Subclass overrides this to alter the behavior of put method.
    */
    void addEntry(int hash, K key, V value, int bucketIndex) {
    if ((size >= threshold) && (null != table[bucketIndex])) {
    resize(2 * table.length);
    hash = (null != key) ? hash(key) : 0;
    bucketIndex = indexFor(hash, table.length);
    }

    createEntry(hash, key, value, bucketIndex);
    }

新添加元素时,先看下有没有达到threshold && bucketIndex元素不为null

如果成立,那就需要resize,这是个耗性能的操作,所以在初始化时,一般计算好元素多少,给一个合适的初始容量

按照上面的条件,一个合适的容量应该这样计算了

我们有1000个元素new HashMap(1000), 但是理论上来讲new HashMap(1024)更合适,即使是1000,hashmap也自动会将其设置为1024。 但是new HashMap(1024)还不是更合适的,因为0.751000 < 1000, 也就是说为了让0.75 size > 1000, 我们必须这样new HashMap(2048)才最合适,既考虑了&的问题,也避免了resize的问题。

先看createEntry

1
2
3
4
5
void createEntry(int hash, K key, V value, int bucketIndex) {
Entry<K,V> e = table[bucketIndex];
table[bucketIndex] = new Entry<>(hash, key, value, e);
size++;
}

这个很简单,新建一个Entry指向bucketIndex,老元素e,被指向新元素的next。

这儿也看出hashmap是使用的拉链法

看Entry,就是一个链表,一个元素接着另一个元素

1
2
3
4
5
6
7
8
9
10
11
12
static class Entry<K,V> implements Map.Entry<K,V> {
final K key;
V value;
Entry<K,V> next;
int hash;

Entry(int h, K k, V v, Entry<K,V> n) {
value = v;
next = n;
key = k;
hash = h;
}

resize方法

耗性能的地方,重新散列,看看这个方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//在put方法里,当容量达到threshold时,进行双倍的扩容。
resize(2 * table.length);


void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
}

Entry[] newTable = new Entry[newCapacity];
transfer(newTable, initHashSeedAsNeeded(newCapacity));
table = newTable;
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

/**
* Transfers all entries from current table to newTable.
*/
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}
}

如果容量已经达到MAX_VALUE,那就不扩容了,只能处理冲突

transfer(),这是核心方法,把以前的元素,按新的capacity,计算的hashCode,重新放置元素。

这儿看到e.next=newTable[i],明显使用的是链表头部插入法

get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public V get(Object key) {
if (key == null)
return getForNullKey();
Entry<K,V> entry = getEntry(key);

return null == entry ? null : entry.getValue();
}
final Entry<K,V> getEntry(Object key) {
if (size == 0) {
return null;
}

int hash = (key == null) ? 0 : hash(key);
for (Entry<K,V> e = table[indexFor(hash, table.length)];
e != null;
e = e.next) {
Object k;
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
}
return null;
}

get很好理解了,跟算法一致,先hash找bucket,如果有冲突的元素,再使用equals来判定

hashSeed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* A randomizing value associated with this instance that is applied to
* hash code of keys to make hash collisions harder to find. If 0 then
* alternative hashing is disabled.
*/
transient int hashSeed = 0;

/**
* Initialize the hashing mask value. We defer initialization until we
* really need it.
*/
final boolean initHashSeedAsNeeded(int capacity) {
boolean currentAltHashing = hashSeed != 0;
boolean useAltHashing = sun.misc.VM.isBooted() &&
(capacity >= Holder.ALTERNATIVE_HASHING_THRESHOLD);
boolean switching = currentAltHashing ^ useAltHashing;
if (switching) {
hashSeed = useAltHashing
? sun.misc.Hashing.randomHashSeed(this)
: 0;
}
return switching;
}

initHashSeedAsNeeded在inflateTable中被调用过,这个hashSeed到底有什么用?

hashSeed就好像我们加密时使用的密钥,在StackOverfolw上是这样描述hashSeed的:

The seed parameter is a means for you to randomize the hash function. You should provide the same seed value for all calls to the hashing function in the same application of the hashing function. However, each invocation of your application (assuming it is creating a new hash table) can use a different seed, e.g., a random value.

Why is it provided?

One reason is that attackers may use the properties of a hash function to construct a denial of service attack. They could do this by providing strings to your hash function that all hash to the same value destroying the performance of your hash table. But if you use a different seed for each run of your program, the set of strings the attackers must use changes.

虽然seed近似随机,但在同一个HashMap中必须保证每次的计算Hash值的时候使用的同一个seed,也就相当于保证我们在一个密码系统中加密时,使用同一个密钥。同时使用seed可以抵御攻击,因为每个应用的seed都会一样。

对于安全方面的了解几乎是0,有机会再学习了。

ConcurrentModificationException

hashmap不是线程安全的

1
2
3
4
5
6
7
8
9
10
 private abstract class HashIterator<E> implements Iterator<E> {
Entry<K,V> next; // next entry to return
int expectedModCount; // For fast-fail
int index; // current slot
Entry<K,V> current; // current entry


final Entry<K,V> nextEntry() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();

这个内部遍历类使用了Fail-Fast机制

1
2
3
4
5
6
7
8
/**
* The number of times this HashMap has been structurally modified
* Structural modifications are those that change the number of mappings in
* the HashMap or otherwise modify its internal structure (e.g.,
* rehash). This field is used to make iterators on Collection-views of
* the HashMap fail-fast. (See ConcurrentModificationException).
*/
transient int modCount;

modCount为HashMap的一个实例变量,并且被声明为volatile,表示任何线程都可以看到该变量被其它线程修改的结果

(根据JVM内存模型的优化,每一个线程都会存一份自己的工作内存,此工作内存的内容与本地内存并非时时刻刻都同步,因此可能会出现线程间的修改不可见的问题)

使用Iterator开始迭代时,会将modCount的赋值给expectedModCount,在迭代过程中,通过每次比较两者是否相等来判断HashMap是否在内部或被其它线程修改

在并发条件下,还是要使用并发包下的ConcurrentHashmap,有时间也得写一写

总结

(1) 扩容是一个特别耗性能的操作,所以当程序员在使用HashMap的时候,估算map的大小,初始化的时候给一个大致的数值,避免map进行频繁的扩容。

(2) 负载因子是可以修改的,也可以大于1,但是建议不要轻易修改,除非情况非常特殊。

(3) HashMap是线程不安全的,不要在并发的环境中同时操作HashMap,建议使用ConcurrentHashMap。

参考资料

浅谈算法和数据结构: 十一 哈希表

单链表—java实现

HashMap的存取之美

游戏小传序

发表于 2017-04-01
字数统计: 476 字数 | 阅读时长 ≈ 1 分钟

序言

在游戏行业足足待了八年了。从大学毕业就开始开发游戏。一直到现在还在一线岗位。

从小小的开发,到掌握后端全局的主程;从胎死腹中的产品,到月流水过亿的大作。

游戏人太苦,996工作制是正常的,凌晨下班也是常有的,为什么可以坚持,可能是心中都有个游戏梦,要大成,要大作。但每次都是充满期待,结果却无声奈何。想想那些从别的行业转来做游戏的同事,一个一个期望变成失落,坚持了几年,又都失望地离开了游戏行业,而我是幸运的,至少经历了大作,经过了一款月流水过亿的产品。

人常说七年就是一辈子,那我这算在工作经历上过完了一辈子,下辈子怎么过呢?不清楚。但至少不能再重复上辈子

对于一辈子想有个总结,开始想写本书,至少是个系列《游戏开发实战》,写的过程又是重学的过程,而之前定的一周一篇技术文章的目标总是不能执行,因为时间不够,工作很忙,要写一篇好的技术文章很废时间。所以不想再以技术文章为系列去写,写技术文章太枯燥,由浅入深,面面俱到,太累!

所以这次就以时间为轴线,以技术内容为补充,记录一下我从小鸟变成一个老兵的成长过程,在此过程中遇到的一些问题,思考心得。

1…111213
朱兴生

朱兴生

128 日志
3 分类
50 标签
© 2016 — 2022 朱兴生 | Site words total count: 305.9k
由 Hexo 强力驱动
|
主题 — NexT.Mist v5.1.4
沪ICP备18040647号-1