码农戏码

新生代农民工的自我修养

0%

背景

图片分库存储时,每一张图片都可以定位到特定的服务器

hash

上图中,假设我们查找的是”a.png”,由于有4台服务器(排除从库),因此公式为hash(a.png) % 4 = 2 ,可知定位到了第2号服务器,这样的话就不会遍历所有的服务器,大大提升了性能!

一切都运行正常,再考虑如下的两种情况;

  1. 一个 cache 服务器 m down 掉了(在实际应用中必须要考虑这种情况),这样所有映射到 cache m 的对象都会失效,怎么办,需要把 cache m 从 cache 中移除,这时候 cache 是 N-1 台,映射公式变成了 hash(object)%(N-1) ;
  2. 由于访问加重,需要添加 cache ,这时候 cache 是 N+1 台,映射公式变成了 hash(object)%(N+1) ;

1 和 2 意味着什么?

这意味着突然之间几乎所有cache 都失效了。缓存雪崩,这是一场灾难

一致性hash

有什么方法可以改变这个状况呢,这就是 consistent hashing

比如有{N0, N1, N2}三个节点,陆续有多个资源要分配到这三个节点上,如何尽可能均匀的分配到这些节点上

算法

一致性哈希算法的思路为:先构造出一个长度为2^32 整数环,根据N0-3的节点名称的hash值(分布为[0,2^32 -1])放到这个环上

整个空间按顺时针方向组织,圆环的正上方的点代表0,0点右侧的第一个点代表1,以此类推,2、3、4、5、6……直到2^32 -1,也就是说0点左侧的第一个点代表2^32 -1, 0和2^32 -1在零点中方向重合,我们把这个由2^32个点组成的圆环称为Hash环

下一步将各个服务器使用Hash进行一个哈希,具体可以选择服务器的ip或主机名作为关键字进行哈希,这样每台机器就能确定其在哈希环上的位置,这里假设四台服务器使用ip地址哈希后在环空间的位置如下:

接下来使用如下算法定位数据访问到相应服务器:将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针“行走”,第一台遇到的服务器就是其应该定位到的服务器!

例如我们有Object A、Object B、Object C、Object D四个数据对象,经过哈希计算后,在环空间上的位置如下:

根据一致性Hash算法,数据A会被定为到Node A上,B被定为到Node B上,C被定为到Node C上,D被定为到Node D上

容错性

如果一个节点宕机了,会引起系统故障吗?

如上图,Node C不幸宕机,可以看到此时对象A、B、D不会受到影响,只有C对象被重定位到Node D。一般的,在一致性Hash算法中,如果一台服务器不可用,则受影响的数据仅仅是此服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它不会受到影响

扩展性

如果在系统中增加一台服务器Node X

此时对象Object A、B、D不受影响,只有对象C需要重定位到新的Node X !一般的,在一致性Hash算法中,如果增加一台服务器,则受影响的数据仅仅是新服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它数据也不会受到影响

综上所述,一致性Hash算法对于节点的增减都只需重定位环空间中的一小部分数据,具有较好的容错性和可扩展性

数据倾斜

一致性Hash算法在服务节点太少时,容易因为节点分部不均匀而造成数据倾斜(被缓存的对象大部分集中缓存在某一台服务器上)问题,例如系统中只有两台服务器,其环分布如下:

此时必然造成大量数据集中到Node A上,而只有极少量会定位到Node B上

然而,这又会造成一个“雪崩”的情况,即A节点由于承担了B节点的数据,所以A节点的负载会变高,A节点很容易也宕机,这样依次下去,这样造成整个集群都挂了

虚拟节点

计算机的任何问题都可以通过增加一个虚拟层来解决

解决上述数据倾斜问题,也可能通过使用虚拟层的手段:将每台物理缓存服务器虚拟为一组虚拟缓存服务器,将虚拟服务器的hash值放置在hash环上,Key在环上先找到虚拟服务器节点,再得到物理服务器的信息

例如上面的情况,可以为每台服务器计算三个虚拟节点,于是可以分别计算 “Node A#1”、“Node A#2”、“Node A#3”、“Node B#1”、“Node B#2”、“Node B#3”的哈希值,于是形成六个虚拟节点:

同时数据定位算法不变,只是多了一步虚拟节点到实际节点的映射,例如定位到“Node A#1”、“Node A#2”、“Node A#3”三个虚拟节点的数据均定位到Node A上。这样就解决了服务节点少时数据倾斜的问题

那么在实践中,一台物理服务器虚拟为多少个虚拟服务器节点合适呢?太多会影响性能,太少又会导致负载不均衡,一般说来,经验值是150,当然根据集群规模和负载均衡的精度需求,这个值应该根据具体情况具体对待

实现

判定哈希算法好坏的四个定义:

  1. 平衡性(Balance):平衡性是指哈希的结果能够尽可能分布到所有的缓冲中去,这样可以使得所有的缓冲空间都得到利用。很多哈希算法都能够满足这一条件
  2. 单调性(Monotonicity):单调性是指如果已经有一些内容通过哈希分派到了相应的缓冲中,又有新的缓冲加入到系统中。哈希的结果应能够保证原有已分配的内容可以被映射到原有的或者新的缓冲中去,而不会被映射到旧的缓冲集合中的其他缓冲区
  3. 分散性(Spread):在分布式环境中,终端有可能看不到所有的缓冲,而是只能看到其中的一部分。当终端希望通过哈希过程将内容映射到缓冲上时,由于不同终端所见的缓冲范围有可能不同,从而导致哈希的结果不一致,最终的结果是相同的内容被不同的终端映射到不同的缓冲区中。这种情况显然是应该避免的,因为它导致相同内容被存储到不同缓冲中去,降低了系统存储的效率。分散性的定义就是上述情况发生的严重程度。好的哈希算法应能够尽量避免不一致的情况发生,也就是尽量降低分散性
  4. 负载(Load):负载问题实际上是从另一个角度看待分散性问题。既然不同的终端可能将相同的内容映射到不同的缓冲区中,那么对于一个特定的缓冲区而言,也可能被不同的用户映射为不同 的内容。与分散性一样,这种情况也是应当避免的,因此好的哈希算法应能够尽量降低缓冲的负荷

在具体实现时,主要考虑点选择适合的数据结构构造hash环

此数据结构的特点:插入与删除性能、快速找到特定元素的下一位

常见算法结构可以有回顾:

《一篇解决排序算法》

《树结构概述》

从时间复杂度方面选择,使用平衡二叉树数据结构,可以使得查找的时间复杂度降低为O(logN)

使用java,以TreeMap为例,TreeMap本身还提供了一个tailMap(K fromKey)方法,支持从红黑树中查找比fromKey大的值的集合,但并不需要遍历整个数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.Collection;
import java.util.SortedMap;
import java.util.TreeMap;

public class ConsistentHash<T> {

private final HashFunction hashFunction;
private final int numberOfReplicas;
private final SortedMap<Integer, T> circle = new TreeMap<Integer, T>();

public ConsistentHash(HashFunction hashFunction, int numberOfReplicas,
Collection<T> nodes) {
this.hashFunction = hashFunction;
this.numberOfReplicas = numberOfReplicas;

for (T node : nodes) {
add(node);
}
}

public void add(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.put(hashFunction.hash(node.toString() + i), node);
}
}

public void remove(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.remove(hashFunction.hash(node.toString() + i));
}
}

public T get(Object key) {
if (circle.isEmpty()) {
return null;
}
int hash = hashFunction.hash(key);
if (!circle.containsKey(hash)) {
SortedMap<Integer, T> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
return circle.get(hash);
}
}

dubbo实现

理解完理论,再扒一下工业级产品的运用,dubbo负载均衡策略之一ConsistentHashLoadBalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final TreeMap<Long, Invoker<T>> virtualInvokers;

ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}

整体思想是一样的,虚拟节点+TreeMap,但实现得更精致,使用MD5加密KEY,更加平衡性,整体的思路解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//对所有节点,生成nCopies个虚拟结点  
for(Node node : nodes) {
//每四个虚拟结点为一组,为什么这样?下面会说到
for(int i=0; i<nCopies / 4; i++) {
//getKeyForNode方法为这组虚拟结点得到惟一名称
byte[] digest=HashAlgorithm.computeMd5(getKeyForNode(node, i));
/** Md5是一个16字节长度的数组,将16字节的数组每四个字节一组,分别对应一个虚拟结点,这就是为什么上面把虚拟结点四个划分一组的原因*/
for(int h=0;h<4;h++) {
//对于每四个字节,组成一个long值数值,做为这个虚拟节点的在环中的惟一key
//结果转换为long类,这是因为生成的结果是一个32位数,若用int保存可能会产生负数。而一致性hash生成的逻辑环其hashCode的范围是在 0 - MAX_VALUE之间。因此为正整数,所以这里要强制转换为long类型,避免出现负数。
Long k = ((long)(digest[3+h*4]&0xFF) << 24)
| ((long)(digest[2+h*4]&0xFF) << 16)
| ((long)(digest[1+h*4]&0xFF) << 8)
| (digest[h*4]&0xFF);
allNodes.put(k, node);
}
}
}

总结

QA

为什么hash一致性的数据空间范围是2^32次方?

这个问题有两种答案,一是技术限制、一是实际场景:

  1. 因为,java中int的最大值是2^31-1最小值是-2^31,2^32刚好是无符号整形的最大值
  2. 因为一致性hash算法是来做服务器的负载均衡,而服务器的IP地址是32位,所以是2^32-1次方的数值空间

进一步追尾基础,为什么java中int的最大值是2^31-1最小值是-2^31?

因为,int的最大值最小值范围设定是因为一个int占4个字节,一个字节占8位,二进制中刚好是32位

根据算法特性,一致性hash是最好的选择吗?

下一篇介绍另一种实现google maglev hashing算法

参考资料

《大型网站技术架构》

对一致性Hash算法,Java代码实现的深入研究

为什么hash环是32位

没有一身好内功,招式再多都是空;算法绝对是防身必备,面试时更是不可或缺;跟着算法渣一起从零学算法

线性排序

常见的三种以线性时间运行的算法:计数排序、基数排序和桶排序

需要注意的是线性排序算法是非基于比较的排序算法,都有使用限制才能达到线性排序的效果

线性排序是个神奇的算法,比基数排序及桶排序神奇得多

定义

计数排序是一个非基于比较的排序算法,该算法于1954年由 Harold H. Seward 提出。它的优势在于在对一定范围内的整数排序时,它的复杂度为Ο(n+k)(其中k是整数的范围),快于任何比较排序算法

算法

计数排序的基本思想是对于给定的输入序列中的每一个元素x,确定该序列中值小于x的元素的个数(此处并非比较各元素的大小,而是通过对元素值的计数和计数值的累加来确定)。一旦有了这个信息,就可以将x直接存放到最终的输出序列的正确位置上

首先需要三个数组,第一个数组记录A要排序的数列大小为n,第二个数组B要记录比某个数小的其他数字的个数所以第二个数组的大小应当为K(数列中最大数的大小),第三个数组C为记录排序好了的数列的数组,大小应当为n。

接着需要确定数组最大值并确定B数组的大小。并对每个数由小到大的记录数列中每个数的出现次数。因为是有小到大通过出现次数可以通过前面的所有数的出现次数来确定比这个数小的数的个数,从而确定其位置

假定20个随机整数的值,也就是第一个数组A:

9,3,5,4,9,1,2,7,8,1,3,6,5,3,4,0,10,9 ,7,9

数组A中最大数为10,数组B需要0~10索引

比如第一个整数是9,那么数组下标为9的元素加1:

第二个整数是3,那么数组下标为3的元素加1:

最终,数列遍历完毕时,数组的状态如下:

数组每一个下标位置的值,代表了数列中对应整数出现的次数。

有了这个“统计结果”,排序就很简单了。直接遍历数组,输出数组元素的下标值,元素的值是几,就输出几次:

0,1,1,2,3,3,3,4,4,5,5,6,7,7,8,9,9,9,9,10

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 计数排序
* @param array
*/
public static void countSort(int []array){
//最大最小数
int min = Integer.MAX_VALUE;
int max = Integer.MIN_VALUE;
for ( int a:array) {
if(min > a) {
min = a;
}
if(max < a) {
max = a;
}
}

//数组索引数量,统计数组计数
// max-min 主要是考虑到像[90,99,93,95]不是从很小数开始的数组排序,减小空间消耗
int indexCount = max - min + 1;
System.err.println("统计数组长度"+indexCount);
int []countArray = new int[indexCount];

for (int i=0;i<array.length;i++){
System.err.println(array[i]-min);
countArray[array[i]-min]++;
}
System.err.println("countArray:"+Arrays.toString(countArray));

//排好序的数组
int [] sortArray = new int[array.length];
int index = 0;
for (int i=0;i<indexCount;i++) {
for (int j=0;j<countArray[i];j++){
sortArray[index++] = min + i;
}
}
//输出就是有序
System.err.println(Arrays.toString(sortArray));
}

改进

上面的实现为什么要改进,主要是当两个元素相同时,算法的稳定性问题

改进之前,数组B存放的是元素出现的次数

改进之后,会引入一个新数组(也可以共用数组B),存放的是元素的位置号(这个纯粹是个人理解,之前数组B是存放元素出现的次数,新数组存放每一个元素都加上前面所有元素出现次数之和,当找到对应排序数组元素时,新数组元素就是位置号)

语言比较空洞,直接来个示例(转自小灰程序员)

将数组arr中的数据当作是学生的成绩,要求不但要按照顺序从低到高排序,成绩相同时,按原有顺序显示:

统计数组从第二个元素开始,每一个元素都加上前面所有元素之和

第一步,我们遍历成绩表最后一行的小绿:

小绿是95分,我们找到countArray下标是5的元素,值是4,代表小绿的成绩排名位置在第4位。

同时,我们给countArray下标是5的元素值减1,从4变成3,,代表着下次再遇到95分的成绩时,最终排名是第3。

第二步,我们遍历成绩表倒数第二行的小白:

小白是94分,我们找到countArray下标是4的元素,值是2,代表小白的成绩排名位置在第2位。

同时,我们给countArray下标是4的元素值减1,从2变成1,,代表着下次再遇到94分的成绩时(实际上已经遇不到了),最终排名是第1。

第三步,我们遍历成绩表倒数第三行的小红:

小红是95分,我们找到countArray下标是5的元素,值是3(最初是4,减1变成了3),代表小红的成绩排名位置在第3位。

同时,我们给countArray下标是5的元素值减1,从3变成2,,代表着下次再遇到95分的成绩时(实际上已经遇不到了),最终排名是第2。

这样一来,同样是95分的小红和小绿就能够清楚地排出顺序了,也正因此,优化版本的计数排序属于稳定排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 稳定计数排序
* @param array
*/
public static void countSort1(int []array) {
//最大最小数
int min = Integer.MAX_VALUE;
int max = Integer.MIN_VALUE;
for ( int a:array) {
if(min > a) {
min = a;
}
if(max < a) {
max = a;
}
}

//数组索引数量,统计数组计数
// max-min 主要是考虑到像[90,99,93,95]不是从很小数开始的数组排序,减小空间消耗
int indexCount = max - min + 1;
System.err.println("统计数组长度"+indexCount);
int []countArray = new int[indexCount];

for (int i=0;i<array.length;i++){
System.err.println(array[i]-min);
countArray[array[i]-min]++;
}
System.err.println("countArray:"+Arrays.toString(countArray));
//位置数组
int []pointArray = new int[indexCount];

int sum =0;
for (int i = 0;i<indexCount;i++){
sum += countArray[i];
pointArray[i] = sum;
}
System.err.println("pointArray:"+Arrays.toString(pointArray));
System.err.println("aaaaaArray:"+Arrays.toString(array));
//排好序的数组
int [] sortArray = new int[array.length];
for (int i=array.length-1;i>=0;i--) {
sortArray[pointArray[array[i]-min] -1 ] = array[i];
pointArray[array[i]-min]--;
}
//输出就是有序
System.err.println(Arrays.toString(sortArray));
}

总结

复杂度

假设array元素有N个,取值范围是M。

时间复杂度:3N(计算最大最小数、计数、排好序的数组) + M(位置数组),去掉系数,时间复杂度是O(N+M)

空间复杂度:只考虑统计数组,那就是M

局限性

1.当数列最大最小值差距过大时,并不适用计数排序。

比如给定20个随机整数,范围在0到1亿之间,这时候如果使用计数排序,需要创建长度1亿的数组。不但严重浪费空间,而且时间复杂度也随之升高。

2.当数列元素不是整数,并不适用计数排序。

如果数列中的元素都是小数,比如25.213,或是0.00000001这样子,则无法创建对应的统计数组。这样显然无法进行计数排序。

引申阅读

算法渣-排序-基数排序

算法渣-排序-桶排序

参考资料

漫画:什么是计数排序

  1. 为什么需要限流
  2. 如何限流

限流主要就是考虑这两点

为什么需要限流

之前已经介绍了熔断,降级,为什么还需要一个限流呢?是不是多此一举呢?

先来个结论:

熔断是为了防止雪崩,明哲保身;而限流则是在已有条件下,最大限制发挥系统效能

根据《熔断机制》可以知道,熔断有三种状态,[熔断关闭],[半熔断],[熔断开启]三种状态,如果系统压力过大,一个服务就会在三种状态来回切换

会出现一种情况,就像一辆开在崎岖山路上的法拉利,不管车的性能多好,都需要不停的加速减速

要想速度达到最佳,就得让车开在一条笔直的高速公路上

系统就是一条河,服务就像行驶在河里的船,岸的两边,一边是熔断,另一边就是限流;一个保障系统安全,一个保持最大限度运转,让系统达到高可用

如何限流

限流如何实施?

  1. 量化限流阀值
  2. 确定限流策略、算法
  3. 被限制流量的处理

限流阀值,这个其实就是通过系统压力测试来确定

这个工作其实在系统开发之初就需要有初步的估量,涉及到业务规模,增长速度,架构选择等等,根据现有资源及其服务能力,给出上限值

《计数器算法》中已经说明了几种限流算法:固定窗口、滑动窗口、漏桶、令牌桶

有人总结为【两窗两桶】,很形象

固定窗口:临界问题,一旦流量波动,计数器提前计满,剩余时间都会被限流

滑动窗口:固定窗口的「固定周期」已经很小,那么使用滑动窗口也就没有意义,虽然大大减小临界问题,但总归需要预估一个窗口量,很难把握,过小引起性能和资源损耗

漏桶:不管进来多少量,出去的速率是恒定的,“出口”速度固定;当桶容量满时,就会被限流

令牌桶:“进口”速度固定;只要令牌的生成速度大于等于请求被处理的速度,系统就能正常处理

有了限流策略,那制定在服务端还是客户端呢?

从效果上讲,肯定是客户端,限制越靠入口,系统就越安全,可以防止不必要的无效流量进入服务端系统

你也可能想到对网络资源的浪费,但一般各个服务都是在同一IDC机房,因此这点不必考虑

但成本相对在服务端过高,放在客户端,客户端的数量与扩张速度也是动态的,无法评估,对每个端的限流阀值分配与下发都很复杂

因此一般都是在服务端进行限流

至于被限制的流量如何处理?大多数情况是直接抛弃,在系统之初,就有了对流量预判,从设计到资源准备都很充足,会有合理的限流阀值;如果有突发流量,可能配合监控及时增加机器,横向扩展

总结

限流与熔断,一个让系统保持最大活跃性,一个保障安全,两者技术手段有重叠,但意义完全不同,所以两者都是微服务中不可少的两个服务治理手段

  1. semaphore的定义,意义
  2. 在没有juc semaphore之前怎么实现
  3. semaphore使用
  4. 分布式semaphore实现

信号量

最早用来解决进程同步与互斥问题的机制:
包括一个称为信号量的变量及对它进行的两个原语操作(PV操作)

什么是信号量?

信号量(semaphore)的数据结构为一个值和一个指针,指针指向等待该信号量的下一个进程。信号量的值与相应资源的使用情况有关。

PV操作由P操作原语和V操作原语组成(原语是不可中断的过程)

(注,P是荷兰语的Passeren,相当于英文的pass,V是荷兰语的Verhoog,相当于英文中的incremnet)

对信号量进行操作,具体定义如下:

  • P(S):
    • ①将信号量S的值减1,即S=S-1;
    • ②如果S>=0,则该进程继续执行;否则该进程置为等待状态,排入等待队列
  • V(S):
    • ①将信号量S的值加1,即S=S+1;
    • ②如果S>0,则该进程继续执行;否则释放队列中第一个等待信号量的进程

PV操作的意义:我们用信号量及PV操作来实现进程的同步和互斥。PV操作属于进程的低级通信

使用PV操作实现进程互斥时应该注意的是:

  1. 每个程序中用户实现互斥的P、V操作必须成对出现,先做P操作,进临界区,后做V操作,出临界区。若有多个分支,要认真检查其成对性
  2. P、V操作应分别紧靠临界区的头尾部,临界区的代码应尽可能短,不能有死循环
  3. 互斥信号量的初值一般为1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//许可数量
private int permits = 1;

public synchronized void P() {
permits--;
if(permits < 0 ){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public synchronized void V(){
permits++;
if(permits <=0){
notifyAll();
}
}

J.U.C Semaphore

JUC提供了工具类之一就是Semaphore,提供了丰富的API,不再需要自己实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 创建具有给定的许可数和非公平的公平设置的 Semaphore。
Semaphore(int permits)
// 创建具有给定的许可数和给定的公平设置的 Semaphore。
Semaphore(int permits, boolean fair)

// 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void acquire()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
void acquire(int permits)
// 从此信号量中获取许可,在有可用的许可前将其阻塞。
void acquireUninterruptibly()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信号量中当前可用的许可数。
int availablePermits()
// 获取并返回立即可用的所有许可。
int drainPermits()
// 返回一个 collection,包含可能等待获取的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待获取的线程的估计数目。
int getQueueLength()
// 查询是否有线程正在等待获取。
boolean hasQueuedThreads()
// 如果此信号量的公平设置为 true,则返回 true。
boolean isFair()
// 根据指定的缩减量减小可用许可的数目。
protected void reducePermits(int reduction)
// 释放一个许可,将其返回给信号量。
void release()
// 释放给定数目的许可,将其返回到信号量。
void release(int permits)
// 返回标识此信号量的字符串,以及信号量的状态。
String toString()
// 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
boolean tryAcquire()
// 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
boolean tryAcquire(int permits)
// 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
boolean tryAcquire(long timeout, TimeUnit unit)

对于JUC的Semaphore源码,此篇不阐述了,另开新篇;但对分布式的Semaphore倒是可以研究下

分布式Semaphore

Redission中有对应的RSemaphore

1
2
3
4
5
6
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();

可过期信号量

1
2
3
4
5
6
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);

直接上最本质的源码片段,lua脚本很简单,对信号量进行计数,acquire时,信号量减1,release时,信号量加1;主要是保证操作的原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(true);
}

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), permits);
}

@Override
public RFuture<Void> releaseAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(null);
}

return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
"redis.call('publish', KEYS[2], value); ",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}

在最本质的基础上,再深入看一下还做了哪些事,能真正达到一个工业生产标准

tryAcquire()

非阻塞式,有信息量就正常获取,没有刚快速返回,就是lua本质,没有做额外的事情

acquire()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void acquire(int permits) throws InterruptedException {
if (tryAcquire(permits)) {
return;
}
RFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscription(future);
try {
while (true) {
if (tryAcquire(permits)) {
return;
}
getEntry().getLatch().acquire(permits);
}
} finally {
unsubscribe(future);
}
}

阻塞式,相对非阻塞式就多了一些事

  • 1.先tryAcquire,看是否能获取到信号量
  • 2.订阅channel事件
  • 3.无限循环
    • 3.1.先tryAcquire(),尝试一下
    • 3.2.通过getEntry().getLatch(),也就是j.u.c.Semaphore,acquire()阻塞
  • 4.取消订阅

订阅事件内部细节,另开篇再说了,他的目的其实就是释放Semaphore

想像一下,同一个client的两个线程A,B 同时需要获取信号量,如果A成功获取,那么B将被Semaphore阻塞住了,何时退出阻塞呢?

就在线程A进行release()之后,会publish,细节可查看上面的release()中的lua脚本,当B监听到事件时,就会调用Semaphore.release(),再次进行tryAcquire()

tryAcquire(int permits, long waitTime, TimeUnit unit)

如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
public boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();

if (tryAcquire(permits)) {
return true;
}

time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}

current = System.currentTimeMillis();
RFuture<RedissonLockEntry> future = subscribe();
if (!await(future, time, TimeUnit.MILLISECONDS)) {
return false;
}

try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}

while (true) {
current = System.currentTimeMillis();
if (tryAcquire(permits)) {
return true;
}

time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}

// waiting for message
current = System.currentTimeMillis();

getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS);

time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}
}
} finally {
unsubscribe(future);
}
// return get(tryAcquireAsync(permits, waitTime, unit));
}

其实await(future, time, TimeUnit.MILLISECONDS)是使用的CountDownLatch

如果计数到达零,则返回 true;如果在计数到达零之前超过了等待时间,则返回 false

当前是第一个请求,或者别的释放,那就再往下进入循环

CountDownLatch.await()+Semaphore.tryAcquire()配合使用

每一次等待时间后,都需要检查是否超过等待时间

为什么需要引入CountDownLatch.await()呢? 都使用Semaphore.tryAcquire()不行吗?这个需要再次深入挖掘了

总结

分布式信号量,原理很明了,主要还是通过lua保障redis操作的原子性

阅读redisson源码,发现里面的操作基本都是异步化,底层又是基于netty,大量使用了future模式,如果不知道future模式,会很绕,debug都会晕掉,所以在深入redisson之前,需要再对future模式温习一下

我们不生产代码,我们是代码的搬运工

前不久,阿里大牛虾总再次抛出了分布式锁的讨论,对照之前项目中实现的redis分布式锁总结一下

天才是1%的灵感,加上99%的汗水;编程是1%的编码,加上99%的在Google/StackOverflow/Github上找代码
残酷的现实是,找来的代码可能深藏bug,而不知

在多核多线程环境中,通过锁机制,在某一个时间点上,只能有一个线程进入临界区代码,从而保证临界区中操作数据的一致性

怎么样才是把好锁?

可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。
这把锁要是一把可重入锁(避免死锁)
支持阻塞和非阻塞:和 ReentrantLock 一样支持 lock 和 trylock 以及 tryLock(long timeOut)
这把锁最好是一把公平锁(根据业务需求考虑要不要这条)
有高可用的获取锁和释放锁功能
获取锁和释放锁的性能要好

分布式锁三要素

  1. 外部存储

    分布式锁是在分布式部署环境中给多个主机提供锁服务,需要另外的存储载体

  2. 全局唯一标识

    在多线程环境中,锁可以使一个对象引用,也可以是变量,都有唯一的标识来区分锁保护的不同资源;
    在分布式环境下,也需要,比如对某一特定用户资源操作,业务+userId即可唯一标识

  3. 至少有两种状态,获取和释放

    锁至少需要两种状态:加锁(lock)和解锁(unlock)。
    用状态区分当前尝试获取的锁是否已经被其他操作占用,
    被占用只有等待锁释放后才能尝试获取锁并加锁,保护共享资源

实现

理论知识知道得再多,还得落地才行;只要遵从三要素,就能打造一把好锁,不要拘泥于某一种工具。

网上有很多实现方式,主要是”外部存储“使用了不同的组件,比如数据库,redis,zk,由于这些组件各自特性的不同,实现复杂度各有不同

这儿主要说下在实际工作中使用到的两种方式,数据库与redis

数据库

数据库,任何系统都需要的组件,常规手法,都是使用version来实现乐观锁

version

比如A、B操作员同时读取一余额为1000元的账户,A操作员为该账户增加100元,B操作员同时为该账户扣除50元,A先提交,B后提交。最后实际账户余额为1000-50=950元,但本该为1000+100-50=1050。这就是典型的并发问题

假设数据库中帐户信息表中有一个version字段,当前值为1;而当前帐户余额字段(balance)为1000元。假设操作员A先更新完,操作员B后更新。
a、操作员A此时将其读出(version=1),并从其帐户余额中增加100(1000+100=1100)。
b、在操作员A操作的过程中,操作员B也读入此用户信息(version=1),并从其帐户余额中扣除50(1000-50=950)。
c、操作员A完成了修改工作,将数据版本号加一(version=2),连同帐户增加后余额(balance=1100),提交至数据库更新,此时由于提交数据版本大于数据库记录当前版本,数据被更新,数据库记录version更新为2。
d、操作员B完成了操作,也将版本号加一(version=2)试图向数据库提交数据(balance=950),但此时比对数据库记录版本时发现,操作员B提交的数据版本号为2,数据库记录当前版本也为2,不满足 “提交版本必须大于记录当前版本才能执行更新 “的乐观锁策略,因此,操作员B的提交被驳回。
这样,就避免了操作员B用基于version=1的旧数据修改的结果覆盖操作员A的操作结果的可能。

1
set balance=1100,version=version+1 where id=#{id} and version=#{version};

version简单,除了对业务数据表有侵入性,还有一些场景是胜任不了

比如,在操作一个数量之前,需要确认一下能不能操作

1
2
3
4
5
6
7
int countLimit = select count from limit where id = ${id};

if(countlimit>0){
set balance=1100,version=version+1 where id=#{id} and version=#{version};
}

update count;

这儿操作了多张表,此时就需要再配合事务,才能保证原子性

redis

由于db性能的限制,而redis性能卓越,很多时候会选择redis实现方式

怎么使用redis正确地实现分布式锁,需要了解两方面

  1. 实现分布式锁时,使用到的redis命令
  2. 网上示例可能都有毒

redis命令

setnx 命令(『SET if Not eXists』(如果不存在,则 SET)的简写):
设置成功,返回 1
设置失败,返回 0
该命令是原子操作

getset 命令:
自动将key对应到value并且返回原来key对应的value。如果key存在但是对应的value不是字符串,就返回错误。
返回值:返回之前的旧值,如果之前Key不存在将返回nil。
该命令是原子操作。

get 命令:
get 获取key的值,如果存在,则返回;如果不存在,则返回nil;

del 命令:
del 删除key及key对应的值,如果key不存在,程序忽略

SET 命令:
set key value [EX seconds] [PX milliseconds] [NX|XX]
将字符串值 value 关联到 key 
如果 key 已经持有其他值, SET 就覆写旧值,无视类型。
对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。

可选参数从 Redis 2.6.12 版本开始,SET 命令的行为可以通过一系列参数来修改:

EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 

PX millisecond:设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 

NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 

XX:只在键已经存在时,才对键进行设置操作。

示例

原来项目中使用分布式锁,整个逻辑:

  1. setnx(lockkey, 当前时间+过期超时时间),如果返回 1,则获取锁成功;如果返回 0 则没有获取到锁,转向 2。
  2. get(lockkey) 获取值 oldExpireTime ,并将这个 value 值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向 3。
  3. 计算 newExpireTime = 当前时间+过期超时时间,然后 getset(lockkey, newExpireTime) 会返回当前 lockkey 的值currentExpireTime。判断 currentExpireTime 与 oldExpireTime 是否相等,如果相等,说明当前 getset 设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
  4. 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行 delete 释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。

获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private boolean acquireLock(Jedis j,String lock) throws Exception{
int timeOut = timeoutSeconds*1000;
boolean acquired = false;
long start = System.currentTimeMillis();
int times = 0;
do {
String value = String.valueOf(System.currentTimeMillis() + timeOut + 1);
// 第一个得到这个锁
if (j.setnx(lock, value) == 1) {
logger.info("第一次获取全局锁:{} 成功", lock);
acquired = true;
break;
}
// j.expire(lock, timeoutSeconds); 网络抖动,可能失败
String currentValue = j.get(lock);

// 小于时,可能是上次没有清除,自上次超时后没有别的线程操作过
if (currentValue != null && Long.valueOf(currentValue) < System.currentTimeMillis()) {
// 这是同步操作,只会一个成功
String oldValue = j.getSet(lock, value);
// 别的线程没有赋上值,当前成功得到锁
if (oldValue != null && oldValue.equals(currentValue)) {
acquired = true;
logger.info("获取全局锁:{} 成功,尝试了{}次,经过了{}ms",lock,times,System.currentTimeMillis()-start);
break;
}
}
times++;
Thread.sleep(100);
} while (start + timeOut > System.currentTimeMillis());
if(!acquired){
logger.info("获取全局锁:{} 失败,尝试了{}次",lock,times);
}
return acquired;
}

解锁

1
2
3
4
5
6
7
8
9
private void releaseLock(Jedis j,String lock){
String currentValue = j.get(lock);
if(currentValue != null){
if(System.currentTimeMillis() < Long.valueOf(currentValue) ){
j.del(lock);
logger.info("释放锁{}",lock);
}
}
}

示例缺陷

特地从多年前的项目中把这段代码找出来,当年写完,心里还挺美

网上有很多资料也是差不多样的,但事实并不那么完美,甚至是错误的

加锁
  • 使用jedis.setnx()和jedis.expire()组合实现加锁
1
2
3
4
5
Long result = jedis.setnx(lockKey, value); 
if (result == 1) {
// 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
jedis.expire(lockKey, expireTime);
}

这个问题很明显,setnx与expire不是同一个事务,不俱备原子性;程序崩溃或者网络抖动都会出现死锁问题

  • System.currentTimeMillis()
    这个需要各个client时间必须一致,一旦不一致,就可能加锁失败

  • getSet()
    如果锁为了灵活性,会把timeout作为入参

当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法,那么虽然最终只有一个客户端可以加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖

解锁
  • jedis.del()直接删除

这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的

有种错误改进,增加参数传入requestId

1
2
3
4
5
6
7
public static void releaseLock(Jedis jedis, String lockKey, String requestId) { 
// 判断加锁与解锁是不是同一个客户端
if (requestId.equals(jedis.get(lockKey))) {
// 若在此时,这把锁突然不是这个客户端的,则会误解锁
jedis.del(lockKey);
}
}

还是原子性的问题
如代码注释,问题在于如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景?答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了

缺陷总结

心里认为本来很简单的事,代码大概:

1
2
3
4
5
6
7
8
Lock lock = DistributedReentrantLock.newLock("testlock11");//定义testlock11为key的锁,默认可重入锁
if(lock.tryLock()){
    try{
     xxxxxx
    }finally{
      lock.unlock(); //释放testlock11为key的锁,释放需要放在finally里,防止出异常导致锁没有及时释放
    }
  }

为了提高性能,通过redis原子性接口SETNX:

  1. 使用SETNX命令获取锁,若返回0(key已存在,锁已存在)则获取失败,反之获取成功
  2. 为了防止获取锁后程序出现异常,导致其他线程/进程调用SETNX命令总是返回0而进入死锁状态,需要为该key设置一个“合理”的过期时间释放锁
  3. 使用DEL命令将锁数据删除

结果为了弥补setnx()与expire()两个接口的原子性问题,引入了一堆问题,外强中干

缺陷修正

加锁

Redis 2.6.12版本后,增强了set()命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}

加锁就一行代码:jedis.set(String key, String value, String nxxx, String expx, int time),
这个set()方法一共有五个入参:

  1. 第一个为key,我们使用key来当锁,因为key是唯一的
  2. 第二个为value,我们传的是requestId,通过给value赋值为requestId,就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成
  3. 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
  4. 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定
  5. 第五个为time,与第四个参数相呼应,代表key的过期时间

高可用:

  1. set()加入了NX参数,可以保证如果已有key存在,则不会调用成功,也就是只有一个客户端能持有锁,满足互斥性
  2. 由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁
  3. 将value赋值为requestId,代表加锁的客户端请求标识,那么在解锁的时候就可以进行校验是否是同一个客户端,防止锁交叉
解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 释放分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}

首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)

使用eval()配置lua保证原子性

在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令

有效时间

为什么需要一个有效时间呢?主要就是防止死锁

疑难

  • 执行业务代码操作共享资源的时间大于设置锁的过期时间?

客户端需要设置接口访问超时,接口超时时间需要远远小于锁超时时间,比如锁自动释放的时间是10s,那么接口超时大概设置5-50ms

【虽然能解决问题,但时间设置成了难点,微服务中多少接口,而且接口的timeout都是可配置的,不能每次调整接口timeout时,还是考虑一下锁的timeout】

  • GC的STW

客户端1获得了锁,正准备处理共享资源的时候,发生了Full GC直到锁过期。这样,客户端2又获得了锁,开始处理共享资源。在客户端2处理的时候,客户端1 Full GC完成,也开始处理共享资源,这样就出现了2个客户端都在处理共享资源的情况

续命丸

引入锁续约机制,也就是获取锁之后,释放锁之前,会定时进行锁续约,比如以3min间隔周期进行锁续约

这样如果应用重启了,最多3min等待时间,不会因为时间太长导致的死锁问题,也不会因为时间太短导致被其他线程抢占的问题,也就是锁分布式锁不需要设置过期时间,过期时间对于这个锁来说是滑动的

Redission

虾总给了总结性阐述:

首先启动Daemon线程,一直循环检测所有的分布式key,异步递延分布锁的过期时间,只要在处理业务逻辑,就递延分布锁过期时间3min。
每次添加分布式锁key,同时会生成一个uuid token,定义一个ConcurrentHashMap构造一个全局map维护所有的分布式key,上面Daemon线程会遍历这个map,每次解锁需要比对这个token,token一致才能解锁。
这样以来如果应用重启了,最多会有3min等待时间,不会导致时间太长导致的死锁问题,也不会因为时间太短导致的被其他线程抢占的问题,也就是锁分布式锁不需要设置过期时间,过期时间对于这个锁来说是滑动的

跟随虾总思路,找到了一个开源组件:Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。

相对于平时使用的jedis,redission进行比较高的抽象

redission中的lock主要是RLock接口,继承的juc的Lock接口

1
public interface RLock extends Lock, RExpirable, RLockAsync 


Lock

先看lock(),有两种形式,一个不带leaseTime,一个带leaseTime

1
2
public void lock() ;
public void lock(long leaseTime, TimeUnit unit) ;

边看源码,边解释

两个方法共用了lockInterruptibly()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
  1. 尝试获取锁tryAcquire
  2. 获取失败,订阅此channel的消息(订阅的意义,在解锁时就会发现)
  3. 进入循环,不停的尝试获取锁,其中使用了JUC的Semaphore
  4. 一旦获取成功,则跳出循环
  5. 取消订阅

尝试获取锁tryAcquire里面会用到两个核心方法tryAcquireAsync(),tryLockInnerAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}

Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
  • 1.根据锁的持续时间不同,处理也不同
  • 2.没有设置持续时间,那就是阻塞型,一直等待
    • 2.1.为了防止业务方法执行时间超过锁timeout,则定时续约scheduleExpirationRenewal()
  • 3.设置了持续时间,则不需要进行续约
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {

RFuture<Boolean> future = renewExpirationAsync(threadId);

future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}

if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}

}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
  1. 以internalLockLeaseTime/3间隔时间,定时续约
  2. 如果当前client自身有并发时,通过putIfAbsent保证只有一个task
  3. 续约:当lock存在时,使用pexpire设置过期时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

protected String getLockName(long threadId) { 
return id + ":" + threadId;
}
  • 1.lockname不存在
    • 1.1.hset(lockname,uuid+threadid,1),value=uuid+threadid,有uuid可以区分各个client,有threadid区分各个线程,这样锁就具备了可重入性
    • 1.2.pexpire设置过期时间,防止client挂掉,造成死锁
  • 2.lockname存在
    • 2.1.hexists(lockname,uuid+threadid),这样保证了是同一个锁在同一个client
    • 2.2.hincrby 再次进锁,计数器+1
    • 2.3.pexpire 再次设置超时
  • 3.lockname存在,并且不在同一client
    • 3.1.pttl 返回剩余有效时长

unLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);

future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}

Boolean opStatus = future.getNow();
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});

return result;
}

void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = expirationRenewalMap.get(getEntryName());
if (task != null && (threadId == null || task.getThreadId() == threadId)) {
expirationRenewalMap.remove(getEntryName());
task.getTimeout().cancel();
}
}
  1. 从方法名看,虽然对外好像是直接解锁,但内部是异步执行的
  2. unlockInnerAsync()进行解锁
  3. 从expirationRenewalMap移除,并把task.cancel()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}
  1. lockname不存在,说明已经解锁,publish channelname unlockmessage;return 1
  2. lockname存在,但对于uuid+id不存在,说明不是加锁的client,return nil
  3. lockname存在,并且是当前加锁client
  4. 对lockname uuid+id进行-1,如果counter>0则走5,如果=0 则走6
  5. counter>0 说明锁重入了,计数器-1,并expire
  6. counter=0 说明最终解锁,直接del key,并publish channelname unlockmessage;return 1

redission缺陷

使用cluster时

一个场景:A在向主机1请求到锁成功后,主机1宕机了。现在从机1a变成了主机。但是数据没有同步,从机1a是没有A的锁的。那么B又可以获得一个锁。这样就会造成数据错误。

redlock主要思想就是做数据冗余。建立5台独立的集群,当我们发送一个数据的时候,要保证3台(n/2+1)以上的机器接受成功才算成功,否则重试或报错

redlock实现会更复杂,但从他的算法上看,有zk选举的味道。对于更高可用分布锁,可以借助zk本身特性去实现

总结

对于锁,主要考虑性能与安全,即要保持锁的活跃性,又得保证锁的安全性

分布式锁,除了以上两点,还要考虑实现时的三要素

对于redission,对于锁部分的源码,还有很多的内容,很多的细节需要挖掘,此篇就不写了,太长。

后面再结合JUC,写篇更详细的源码分析

参考资料

Redis分布式锁的正确实现方式

redission

成功就是简单道理的深刻理解与灵活运用

前不久,阿里大牛虾总在群里抛出一个问题:“从深层次讲解一下如何理解IOC,IOC和DI是一回事吗?”

这个问题真是让人平静而又不平静

平静源于此问题就像问中国人怎么使用筷子,天天使用筷子,难道还不会使用筷子?

但又不平静,你能写出一份详细的说明书,让一个不会使用筷子的人按此说明成功地使用上筷子吗?


天天使用spring,是否还记得那些简单原理?现如今会写六种回字的你,还记得回的本意吗?

那些曾经刻骨铭心的记忆,你有多久没有想起了


IOC

不管是平时交流,还是面试,当谈谈spring时,除了api使用,最主要还是要聊聊IOC

Ioc—Inversion of Control,即“控制反转”

从语文的角度解析一下,如果“控制”当成一个动作,那就需要完善主语与宾语,也就是“谁”控制了“谁”;

“反转”:没有反转是什么情况,what反转,why反转,how反转

分两种情况讨论,1、没有IOC容器,2、有IOC容器

没有IOC容器

参与者

回答“谁”控制了“谁”中的两个“谁”

抽象讲:某个对象A控制了另一个某对象B

宏观讲 某个对象A可能就是应用程序,比如读取或者修改一个文件,那么此处的文件也就是某对象B了

微观讲,objectA 操作了 objectB,比如给objectB的属性赋值

从由内向外的角度 由两个参与者:某一对象(任意的对象类),以及对象外的各种资源(需要的其它对象、或者是对象需要的文件资源等等)

控制

常规情况下的应用程序,如果要在A里面使用C,当然是直接去创建C的对象,也就是说,是在A类中主动去获取所需要的外部资源C

1
2
3
A a = new AImpl(); 
C c = new CImpl(); 
a.setC(c);

当前类控制了一切:主动实例化、获取依赖、主动装配

这儿示例简单了点,一些项目会使用上Factory模式,让程序更面向接口,最小化变化,最大化地“开-闭”原则

但不管如何,还是会面临一些问题:

  1. 更换实现需要重新编译源代码
  2. 依赖变更很难更换实现、难于测试
  3. 耦合实例生产者和实例消费者

有IOC容器

引入IOC容器

参与者

除了对象与对象外的资源,增加了IOC容器

控制

引入IOC容器后,就不再是直接控制了,而是被动等待,等待IoC/DI的容器获取一个C的实例,然后反向的注入到A类中

此时,对象不再主动去new,而IoC容器来创建这些对象,即由Ioc容器来控制对象的创建;

再来看“谁”控制了“谁”?

IoC 容器控制了对象;控制什么?主要控制了外部资源获取(不只是对象包括比如文件等)

反转

1
2
3
A a = new AImpl(); 
C c = new CImpl(); 
a.setC(c);

还是看这段代码,包含了 主动实例化、获取依赖主动装配

根据【控制】,IOC做到了主动实例化、获取依赖;而【反转】体现在了主动装配这一点

传统应用程序是由我们自己在对象中主动控制去直接获取并set依赖对象,是为【正转】;

而【反转】则是由容器来帮忙创建及注入依赖对象;

为何是反转?

因为由容器帮我们查找及注入依赖对象,对象只是被动的接受依赖对象,所以是反转;哪些方面反转了?依赖对象的获取、装配被反转了


IOC总结

IoC很好的体现了面向对象设计法则之一—— 好莱坞法则:“别找我们,我们找你”;即由IoC容器帮对象找相应的依赖对象并注入,而不是由对象主动去找

传统关系转变成

IoC对编程带来的最大改变不是从代码上,而是从思想上,发生了“主从换位”的变化。应用程序原本是老大,要获取什么资源都是主动出击,但是在IoC思想中,应用程序就变成被动的了,被动的等待IoC容器来创建并注入它所需要的资源了

这么小小的一个改变其实是编程思想的一个大进步,这样就有效的分离了对象和它所需要的外部资源,使得它们松散耦合,有利于功能复用,更重要的是使得程序的整个体系结构变得非常灵活

DI

DI—Dependency Injection,即“依赖注入”:组件之间依赖关系由容器在运行期决定,形象的说,即由容器动态的将某个依赖关系注入到组件之中。依赖注入的目的并非为软件系统带来更多功能,而是为了提升组件重用的频率,并为系统搭建一个灵活、可扩展的平台。

通过依赖注入机制,我们只需要通过简单的配置,而无需任何代码就可指定目标需要的资源,完成自身的业务逻辑,而不需要关心具体的资源来自何处,由谁实现。  

理解DI的关键是:“谁依赖谁,为什么需要依赖,谁注入谁,注入了什么”

那我们来深入分析一下:

  • 谁依赖于谁:当然是应用程序依赖于IoC容器;  
  • 为什么需要依赖:应用程序需要IoC容器来提供对象需要的外部资源;  
  • 谁注入谁:很明显是IoC容器注入应用程序某个对象,应用程序依赖的对象;  
  • 注入了什么:就是注入某个对象所需要的外部资源(包括对象、资源、常量数据)

IOC VS DI

IoC和DI什么关系?

貌似是个仁者见仁的问题,有人认为两者是同一个东西,也有人认为是不同的概念;

摘抄一些见解


根据上面的讲述,应该能看出来,依赖注入和控制反转是对同一件事情的不同描述,从某个方面讲,就是它们描述的角度不同。依赖注入是从应用程序的角度在描述,可以把依赖注入描述完整点:应用程序依赖容器创建并注入它所需要的外部资源;而控制反转是从容器的角度在描述,描述完整点:容器控制应用程序,由容器反向的向应用程序注入应用程序所需要的外部资源。


DI仅是用一个单独的对象(装配器)来装配对象之间的依赖关系,一般有setter、构造、接口注入等,与IOC不是一回事,仅是IOC依赖管理层面的东西


IOC是思想,DI是IOC的具体实现

也可看看鼻祖Martin Fowler的表述

As a result I think we need a more specific name for this pattern. Inversion of Control is too generic a term, and thus people find it confusing. As a result with a lot of discussion with various IoC advocates we settled on the name Dependency Injection.

参考资料

Inversion of Control Containers and the Dependency Injection pattern
IOC是什么

没有一身好内功,招式再多都是空;算法绝对是防身必备,面试时更是不可或缺;跟着算法渣一起从零学算法

线性排序

常见的三种以线性时间运行的算法:计数排序、基数排序和桶排序;

需要注意的是线性排序算法是非基于比较的排序算法,都有使用限制才能达到线性排序的效果

定义

基数排序的发明可以追溯到1887年赫尔曼·何乐礼在打孔卡片制表机(Tabulation Machine), 排序器每次只能看到一个列。它是基于元素值的每个位上的字符来排序的。 对于数字而言就是分别基于个位,十位, 百位或千位等等数字来排序。

基数排序(Radix sort)是一种非比较型整数排序算法,其原理是将整数按位数切割成不同的数字,然后按每个位数分别比较。由于整数也可以表达字符串(比如名字或日期)和特定格式的浮点数,所以基数排序也不是只能使用于整数

算法

原理是将整数按位数切割成不同的数字,然后按每个位数分别比较

基数排序可以采用两种方式:

LSD(Least Significant Digital):从待排序元素的最右边开始计算(如果是数字类型,即从最低位个位开始)

MSD(Most Significant Digital):从待排序元素的最左边开始计算(如果是数字类型,即从最高位开始)

基数排序又称为“桶子法”,从低位开始将待排序的数按照这一位的值放到相应的编号为0~9的桶中。

等到低位排完得到一个子序列,再将这个序列按照次低位的大小进入相应的桶中,一直排到最高位为止,数组排序完成

演示

通过基数排序对数组{53, 3, 542, 748, 14, 214, 154, 63, 616},它的示意图如下:

在上图中,从最低位开始,依次进行排序。

  1. 按照个位数进行排序。
  2. 按照十位数进行排序。
  3. 按照百位数进行排序。
    排序后,数列就变成了一个有序序列

伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Radix-Sort(A, d)
// 每个在数组A[1...n] 中的元素都是d-位数的正整数
// 位数是从右到左标记上1到d 的
//A[]-- 初始的待排序的数组

// 创建一个for 循环,从1 到d
for j = 1 to d do
int count[10] = {0};
// 将键的计数放在数组count[] 中
// 键key - 是在位数j 上的数字
for i = 0 to n do
count[key of(A[i]) in pass j]++
for k = 1 to 10 do
count[k] = count[k] + count[k-1]

// 创建一个结果数组,通过从count[k] 中检查A[i] 中的新位置
for i = n-1 downto 0 do
result[ count[key of(A[i])] ] = A[j]
count[key of(A[i])]--

//现在主数组A[] 包含了根据现在位数位置排好序的数字
for i=0 to n do
A[i] = result[i]

end for(j)
end func

实现

使用桶排序,把各位上的数分别桶排序,再依次输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private static void radixSort(int []array){
//取最大值,好计算位数
int max = Integer.MIN_VALUE;
for (int a:array) {
if(a> max) {
max = a;
}
}
//(0~9)10个桶
int [][]buckets = new int[10][];

//初始化桶
for(int b=0;b<10;b++) {
buckets[b] = new int[array.length];
}

//每个桶的元素数量
int [] index = new int[10];
//按每一位数排序
for (int radix = 1;max/radix>0;radix*=10){
//把元素放到各自的桶内
for (int a:array) {
//得到每位数
int per = a/radix%10;
buckets[per][index[per]] = a;
index[per]++;
}
//各个桶的数据依次放回数组
int j = 0;
for (int b=0;b<10;b++) {
//去掉桶中别的元素
for (int i = 0;i<index[b];i++){
array[j++] = buckets[b][i];
}
}
System.err.println("按第"+radix+"位,排序:" + Arrays.toString(array));
//清空计数器
index = new int[10];
}
}

对数组{53, 3, 542, 748, 14, 214, 154, 63, 616}进行基数排序

通过程序可以看出每位数排序的结果

1
2
3
按第1位,排序:[542, 53, 3, 63, 14, 214, 154, 616, 748]
按第10位,排序:[3, 14, 214, 616, 542, 748, 53, 154, 63]
按第100位,排序:[3, 14, 53, 63, 154, 214, 542, 616, 748]

百倍排序完,整个数组也就排序好了

复杂度

时间复杂度

假设在基数排序中,r为基数,d为位数。则基数排序的时间复杂度为O(d(n+r))。

空间复杂度

在基数排序过程中,对于任何位数上的基数进行“装桶”操作时,都需要n+r个临时空间

没有一身好内功,招式再多都是空;算法绝对是防身必备,面试时更是不可或缺;跟着算法渣一起从零学算法

线性排序

常见的三种以线性时间运行的算法:计数排序、基数排序和桶排序;网上教程不少,但三者经常混淆,称桶排序但实质可能是计数排序,为了保证原味性,主要参考《算法导论》

需要注意的是线性排序算法是非基于比较的排序算法,都有使用限制才能达到线性排序的效果

定义

桶排序 (Bucket sort)或所谓的箱排序,是一个排序算法,工作的原理是将数组分到有限数量的桶子里。每个桶子再个别排序(有可能再使用别的排序算法或是以递归方式继续使用桶排序进行排序)。桶排序是鸽巢排序的一种归纳结果。当要被排序的数组内的数值是均匀分配的时候,桶排序使用线性时间(Θ(n))

算法

桶排序的思想:其实就是先分配再收集的这个一个过程

假设输入是一个随机过程产生的[0,1)区间上均匀分布的实数

把区间划分成n个大小相同的子区间,或称为桶。然后将n个输入元素分布的各个桶中去。先对各个桶中的数进行排序,然后按次序把各桶中的数据列出来即可

     (因为输入元素均匀而独立的分布在区间 [0,1)上,所以不会出现很多数落在一个桶中的情况)

在桶排序算法中,假设输入的是一个含n个元素的数组A,且每个元素满足0≤A[i]<1。另外,还需要一个辅助数组B[0…n-1]来存放链表(桶),并假设可以用某种机制来维护这些表

【刚开始按照示例图的方式理解了桶排序,桶分10个,以十分位为桶号放入各个桶,也算是桶排序一种实现方式,但还是狭隘了】


在实际应用时,其实并不然必须元素范围为[0,1),整数,小数都是可以的,只要分布均匀就能最大发挥桶排序优势

优质的桶排序需要考虑几个因素:

  1. 桶的数量:桶越多,占用空间越大
  2. 区间跨度:桶与桶之间的跨度
  3. 桶内元素的排序

一般区间跨度:

除了最后一个桶只包含一个最大值之外,其余各桶之间的区间跨度=(最大值-最小值)/(桶数量-1)

1
2
3
4
5
6
7
8
//伪代码
BUCKET_SORT(A) 
n = length(A)   
for i= 1 to n       
do insert A[i] into list B   
for i=0 to n-1       
do sort list B[i] with insertion sort
concatenate the list B[0]、B[1],,,B[n-1] together in order

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private static void bucketSort(double [] array){
int bucketNum = 10;//10个桶
double max = Double.MIN_VALUE;
double min = Double.MIN_VALUE;
for (double a:array) {
if(a> max) {
max = a;
}
if(a<min) {
min = a;
}
}
//区间跨度
double span = (max - min) / (bucketNum - 1);
double [][]buckets = new double[bucketNum][];
//初始化桶
for (int b = 0; b<bucketNum;b++) {
//以排序元素个数初始化每个桶,以防极端情况
buckets[b] = new double[array.length];
}
//每个桶的元素数量
int [] index = new int[10];
for (int d = 0;d<array.length;d++) {
int bucket = (int)((array[d] - min)/span);
buckets[bucket][index[bucket]] = array[d];
index[bucket]++;
}

//每个桶排序,直接使用sort函数了
for(int b = 0; b<10; b++) {
Arrays.sort(buckets[b]);
}
int j = 0;
for(int b = 0; b<10; b++) {
if(index[b] == 0) {
continue;
}
//这儿需要特殊处理一下,主要是因为每个桶初始化了array.length,
// 经过sort排序,比如第一个桶数组变成了[0.0,0.0,......0.002]
//需要剔掉数组中的0
for (int bi = array.length-index[b];bi<array.length;bi++) {
array[j++] = buckets[b][bi];
}
}
}

复杂度

时间复杂度:

假设有m个桶,则每个桶的元素为n/m;

当辅助函数为冒泡排序O(n^2)时,桶排序为 O(n)+mO((n/m)2);

当辅助函数为快速排序时O(nlgn)时,桶排序为 O(n)+m*O(n/m log(n/m))

空间复杂度: 

O(M+N) 

通常桶越多,执行效率越快,即省时间,但是桶越多,空间消耗就越大,是一种通过空间换时间的方式

之前这本书翻译为《营销战》,现在又翻译成《商战》了

结合当前商业动态,此书可能会回答你心中的一些问题

为什么哈罗单车融资成功了,可ofo压金都成了问题?
为什么在阿里绝对的市场占有率之机,拼多多诞生了并还在持续增长?
当年的当当图书,京东快递如何抢占了客户
真的是打江山容易,守江山难吗?
战略与战术到底哪一个更重要?

商业即战争

战争属于商业竞争的范畴,同样也是人类利益和活动的冲突

商业需要新理念

明确清晰的定义是重要的开始,定义就是角度,也见深度

  1. 商业活动必须满足消费者的需要和需求
  2. 人类通过交换过程满足需要和需求的活动
  3. 引导商品和服务从生产者流向消费者的一系列经济活动
  4. 通过预测顾客或客户需求,并引导满足需求的商品和服务从生产者流向顾客或客户,从而实现组织目标的各种活动
  5. 确定客户需求、根据组织生产能力将需求概念化、将概念同组织的适当能力相联系、根据先前确定的客户需求,使随后的生产概念化、将此概念同客户相联系

兵力原则

基本原则:兵力优势。无论何时,都应该首先尽力做到这一点

用兵之法,十则围之,五则攻之,倍则战之,敌则能分之,少则能逃之,不若则能避之。故小敌之坚,大敌之擒也

兵力原则是最基本的战争原则,其本质就是以多打少:大鱼吃小鱼,大公司击败小公司

战地性质

商战在心智中打响,它无时无刻不存在于你自己和潜在顾客的心智中。

心智即战场,一个复杂且难以理解的阵地

战略形式

防御战

原则

  1. 只有市场领导者才能打防御战

  2. 最佳的防御就是勇于自我攻击

  3. 强大进攻必须及时封杀

进攻战

领导者应进行防御战,而非攻击战

进攻战适用于行业第二或第三的公司,公司要足够强大才能向领导者发起持续进攻

原则

  1. 领导者的强势地位是主要的考虑因素
  2. 找到领导者强势中的弱势,并聚而攻之
  3. 尽可能在狭长地带发起攻击

侧翼战

侧翼战是最具有创新性的战略形式

原则

  1. 最佳的侧翼战是在无争地带展开

  2. 战术奇袭是作战计划中最重要的一环

  3. 追击与进攻同等重要

游击战

敌进我退,敌驻我拢,敌疲我打,敌退我追

原则

  1. 找到一块小得足以守得住的阵地

  2. 无论多么成功,都不能效仿领导者

  3. 一旦有变,随时准备撤退

VS侧翼战

侧翼战 游击战
1、 深思熟虑,创意 同样想法应用细分市场
2、 近距离领导者 远离领导者
3、 夺取蚕食领导者市场份额 区域市场

战略与战术

“战术驱动战略”,这是战争研究得出的最重要思想之一。

战略服从战术就像形式服从内容一样,战略应该服从战术。战术结果的取得,是战略的最终目标和唯一目的。

伟大的战略目标是使一切工作在一定的战术层面上顺利进行,而不是其他什么目的

巴顿说:“人们不应该先制订计划,然后让形势适应计划,而应该让计划适应当前的形势。我认为,胜败取决于最高指挥部是否拥有这种能力。”

总结

可以尝试性的给开篇的问题给个答案,不一定对,毕竟不是专业人士,也非局中之人

当年共享单车红遍南北,貌似市场被ofo,魔拜一统天下;而一些品牌却玩起了游击战,突袭攫取他们放弃的三四线城市,并快速追击,以致当领导者出现疲态时,主动进攻,一举进入了一线城市

一个品牌无法支撑两个不同的概念。低价劳斯莱斯会损害其高价的定位,而且很多时候,低价产品无法热销,因为没有人愿意购买廉价的劳斯莱斯。

所以阿里有了淘宝,也要搞天猫;但不可能一统所以市场,拼多多抓住了被忽略,或者说是阿里主动放弃的领地,提出社交电商,打了一个漂亮的侧翼战

淘宝火及一时,可为什么选择京东购物?淘宝品种很全,但物流不够快,京东发现了客户痛点,找到了淘宝领导者强势中的弱势,以“隔日达”为口号,快速占领了用户心智

战略重要,还是战术重要的呢? 好似道与术的取舍一样。人们总认为道更重要。与IT行业的架构一样

但康威定律如是说:设计系统的组织,其产生的设计等同于组织之内、组织之间的沟通结构。也就是架构偏向于组织结构,如果架构异于组织结构,那可能再完美的架构也无法落地。无法落地的架构是好架构吗?

一个伟大的战略必须要以当前环境的战术能力为前提,不能实现,能算是好战略吗?

树的类型不少,此篇简要对树进行描述,脑中有个整体概念,细节分篇再研究

树(Tree)是n(n>=0)个结点的有限集。

在任意一棵非空树中:

(1)有且仅有一个特定的称为根(Root)的结点;

(2)当n>1时,其余结点可分为m(m>0)个互不相交的有限集T1,T2,…,Tn,其中每个集合本身又是一棵树,并称为根的子树(SubTree)

术语

  1. 结点:如上图,每一个圈圈我们就称为树的一个结点
  2. : 结点拥有的子树数称为结点的度-(Degree),树的度取树内各结点的度的最大值
  3. 叶结点(Leaf)或终端结点: 度为0的结点
  4. 内部结点: 度不为0的结点称为分支结点或非终端结点,除根结点外,分支结点也称为内部结点。5. 结点间的关系:结点的子树的根称为结点的孩子(Child),相应的,该结点称为孩子的双亲(Parent),同一双亲的孩子之间互称为兄弟(Sibling)。结点的祖先是从根到该结点所经分支上的所有结点
  5. 深度:树中结点的最大层次称为树的深度(Depth)或高度
  6. 结点的层次(Level): 从根开始定一起,根为第一层,根的孩子为第二层。其双亲在同一层的结点互为堂兄弟
  7. 有序树: 如果将树中结点的各子树看成从左至右是有次序的,不能互换的,则称该树为有序树, 否则为无序树
  8. 森林(Forest): 是 m(m>=0)棵互不相交的树的集合。对树中每个结点而言,其子树的集合即为森林

二叉树

二叉树(Binary Tree)是n(n>=0)个结点的有限集合。该集合可能是空的(空二叉树),也可能由
一个根结点和该根结点的左子树和右子树组成。左右子树不相交,而且都是二叉树

特点

  1. 每个结点最多有两棵字树,即每个结点的度和树的度不大于2。
  2. 结点的左子树和右子树是有顺序的,不能颠倒。
  3. 即使结点只有一棵子树,也是有顺序的

满二叉树

除最后一层无任何子节点外,每一层上的所有结点都有两个子结点。也可以这样理解,除叶子结点外的所有结点均有两个子结点。节点数达到最大值,所有叶子结点必须在同一层上

完全二叉树

若设二叉树的深度为h,除第 h 层外,其它各层 (1~(h-1)层) 的结点数都达到最大个数,第h层所有的结点都连续集中在最左边,这就是完全二叉树

之前讲过的《堆排序》 就是依赖完全二叉树结构

二叉查找树

二叉查找树(Binary Search Tree),又被称为二叉搜索树

(1)若任意节点的左子树不空,则左子树上所有结点的值均小于它的根结点的值;

(2) 若任意节点的右子树不空,则右子树上所有结点的值均大于它的根结点的值;

(3) 任意节点的左、右子树也分别为二叉查找树;

(4) 没有键值相等的节点

复杂度

复杂度分析:它和二分查找一样,插入和查找的时间复杂度均为O(logn),但是在最坏的情况下仍然会有O(n)的时间复杂度。原因在于插入和删除元素的时候,树没有保持平衡

平衡二叉树

平衡二叉搜索树,又被称为AVL树

由于普通的二叉查找树会容易失去”平衡“,极端情况下,**二叉查找树会退化成线性的链表,导致插入和查找的复杂度下降到 O(n) **,所以,这也是平衡二叉树设计的初衷。

具有以下性质

  1. 要么是棵空树,要么其根节点左右子树的深度之差的绝对值不超过1;
  2. 其左右子树也都是平衡二叉树;
  3. 二叉树节点的平衡因子定义为该节点的左子树的深度减去右子树的深度。则平衡二叉树的所有节点的平衡因子只可能是-1,0,1

红黑树

红黑树是一种自平衡二叉查找树,是在计算机科学中用到的一种数据结构,典型的用途是实现关联数组。它是在1972年由Rudolf Bayer发明的,他称之为”对称二叉B树”,它现代的名字是在 Leo J. Guibas 和 Robert Sedgewick 于1978年写的一篇论文中获得的。它是复杂的,但它的操作有着良好的最坏情况运行时间,并且在实践中是高效的: 它可以在O(log n)时间内做查找,插入和删除,这里的n是树中元素的数目

特性

  1. 每个节点要么是红色,要么是黑色。
  2. 根节点永远是黑色的。
  3. 所有的叶节点都是空节点(即 null),并且是黑色的。
  4. 每个红色节点的两个子节点都是黑色。(从每个叶子到根的路径上不会有两个连续的红色节点)
  5. 从任一节点到其子树中每个叶子节点的路径都包含相同数量的黑色节点。

为什么需要红黑树

平衡二叉树的插入/删除操作带来的旋转操作可能会达到logn次

红黑树的插入操作最多进行两次旋转、删除操作最多进行三次旋转

B树

B树是一种平衡的多路查找树,结点最大的孩子数目称为B树的阶(Order),是为磁盘存储而专门设计的一类平衡搜索树

b-tree分为用“阶”的定义和用“度”的定义,度和阶都是描述子节点的数量的

特性

  1. 定义任意非叶子结点最多只有M个儿子;且M>2;
  2. 根结点的儿子数为[2, M];
  3. 除根结点以外的非叶子结点的儿子数为[M/2, M];
  4. 每个结点存放至少M/2-1(取上整)和至多M-1个关键字;(至少2个关键字)
  5. 非叶子结点的关键字个数=指向儿子的指针个数-1;
  6. 非叶子结点的关键字:K[1], K[2], …, K[M-1];且K[i] < K[i+1];
  7. 非叶子结点的指针:P[1], P[2], …, P[M];其中P[1]指向关键字小于K[1]的子树,P[M]指向关键字大于K[M-1]的子树,其它P[i]指向关键字属于(K[i-1], K[i])的子树;
  8. 所有叶子结点位于同一层;

逻辑结构图

这是B树存储在硬盘的逻辑结构图。

其中根节点中17,35在称为关键字(key) ,实际中往往附带更多复杂类型数据。

可以看出一个节点包含 keys ChildNotePointer 2部分信息

为什么需要B树

  1. 当数据量大的时候,树的高度会比较高,数据量大的时候,查询会比较慢
  2. 树每个节点只存储一个记录,可能导致一次查询有很多次磁盘IO

这是个典型的b树结构,初始因子为1000,高度仅为3的b树,就可以存储1002001000的数据了

假设要查询最后一个数据:

  • 从硬盘加载根节点搜索,IO一次。
  • 根据根节点的指针信息,去加载第二层的节点, IO一次。
  • 重复2,IO一次。
    IO只用了3次,就查询了需要的数据,所以说B树效率是非常高的。

B树的节点,在硬盘里表现为:柱面里的页(page)或盘块(block),如果把索引持久化到内存,只需要一次就够了

B+树

B+树是B-树的变体,也是一种多路搜索树:

  1. 其定义基本与B-树相同
  2. 非叶子结点的子树指针与关键字个数相同(B树中是k-1个元素)
  3. 非叶子结点的子树指针P[i],指向关键字值属于[K[i], K[i+1])的子树(B-树是开区间);
  4. 为所有叶子结点增加一个链指针
  5. 所有关键字都在叶子结点出现

B+的搜索与B-树也基本相同,区别是B+树只有达到叶子结点才命中(B-树可以在非叶子结点命中),其性能也等价于在关键字全集做一次二分查找

B树:有序数组+平衡多叉树

B+树:有序数组链表+平衡多叉树

B+树的关键字全部存放在叶子节点中,非叶子节点用来做索引,而叶子节点中有一个指针指向一下个叶子节点。做这个优化的目的是为了提高区间访问的性能。而正是这个特性决定了B+树更适合用来存储外部数据

为什么需要B+树

  1. b+树的中间节点不保存数据,所以磁盘页能容纳更多节点元素,更“矮胖”;
  2. b+树查询必须查找到叶子节点,b树只要匹配到即可不用管元素位置,因此b+树查找更稳定(并不慢);
  3. 对于范围查找来说,b+树只需遍历叶子节点链表即可,b树却需要重复地中序遍历

B*树

B*树是对B+树进行的又一次的升级。在B+树的非根和非叶子结点再增加指向兄弟的指针

在B+树基础上,为非叶子结点也增加链表指针,将结点的最低利用率从1/2提高到2/3;

在这比如说当你进行插入节点的时候,它首先是放到兄弟节点里面。如果兄弟节点满了的话,进行分裂的时候从兄弟节点和这个节点各取出1/3,放入新建的节点当中,这样也就实现了空间利用率从1/2到1/3。

总结

B树:二叉树,每个结点只存储一个关键字,等于则命中,小于走左结点,大于走右结点

B-树:多路搜索树,每个结点存储M/2到M个关键字,非叶子结点存储指向关键字范围的子结点;
所有关键字在整颗树中出现,且只出现一次,非叶子结点可以命中;

B+树:在B-树基础上,为叶子结点增加链表指针,所有关键字都在叶子结点中出现,非叶子结点作为叶子结点的索引;B+树总是到叶子结点才命中;

B*树:在B+树基础上,为非叶子结点也增加链表指针,将结点的最低利用率从1/2提高到2/3;

参考资料

B-树学习笔记