semaphore的定义,意义
在没有juc semaphore之前怎么实现
semaphore使用
分布式semaphore实现
信号量 最早用来解决进程同步与互斥问题的机制: 包括一个称为信号量的变量及对它进行的两个原语操作(PV操作)
什么是信号量?
信号量(semaphore)的数据结构为一个值和一个指针,指针指向等待该信号量的下一个进程。信号量的值与相应资源的使用情况有关。
PV操作由P操作原语和V操作原语组成(原语是不可中断的过程)
(注,P是荷兰语的Passeren,相当于英文的pass,V是荷兰语的Verhoog,相当于英文中的incremnet)
对信号量进行操作,具体定义如下:
P(S):
①将信号量S的值减1,即S=S-1;
②如果S>=0,则该进程继续执行;否则该进程置为等待状态,排入等待队列
V(S):
①将信号量S的值加1,即S=S+1;
②如果S>0,则该进程继续执行;否则释放队列中第一个等待信号量的进程
PV操作的意义:我们用信号量及PV操作来实现进程的同步和互斥。PV操作属于进程的低级通信
使用PV操作实现进程互斥时应该注意的是:
每个程序中用户实现互斥的P、V操作必须成对出现,先做P操作,进临界区,后做V操作,出临界区。若有多个分支,要认真检查其成对性
P、V操作应分别紧靠临界区的头尾部,临界区的代码应尽可能短,不能有死循环
互斥信号量的初值一般为1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 //许可数量 private int permits = 1; public synchronized void P() { permits--; if(permits < 0 ){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } public synchronized void V(){ permits++; if(permits <=0){ notifyAll(); } }
J.U.C Semaphore JUC提供了工具类之一就是Semaphore,提供了丰富的API,不再需要自己实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 // 创建具有给定的许可数和非公平的公平设置的 Semaphore。 Semaphore(int permits) // 创建具有给定的许可数和给定的公平设置的 Semaphore。 Semaphore(int permits, boolean fair) // 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。 void acquire() // 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。 void acquire(int permits) // 从此信号量中获取许可,在有可用的许可前将其阻塞。 void acquireUninterruptibly() // 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。 void acquireUninterruptibly(int permits) // 返回此信号量中当前可用的许可数。 int availablePermits() // 获取并返回立即可用的所有许可。 int drainPermits() // 返回一个 collection,包含可能等待获取的线程。 protected Collection<Thread> getQueuedThreads() // 返回正在等待获取的线程的估计数目。 int getQueueLength() // 查询是否有线程正在等待获取。 boolean hasQueuedThreads() // 如果此信号量的公平设置为 true,则返回 true。 boolean isFair() // 根据指定的缩减量减小可用许可的数目。 protected void reducePermits(int reduction) // 释放一个许可,将其返回给信号量。 void release() // 释放给定数目的许可,将其返回到信号量。 void release(int permits) // 返回标识此信号量的字符串,以及信号量的状态。 String toString() // 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。 boolean tryAcquire() // 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。 boolean tryAcquire(int permits) // 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。 boolean tryAcquire(int permits, long timeout, TimeUnit unit) // 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。 boolean tryAcquire(long timeout, TimeUnit unit)
对于JUC的Semaphore源码,此篇不阐述了,另开新篇;但对分布式的Semaphore倒是可以研究下
分布式Semaphore Redission中有对应的RSemaphore
1 2 3 4 5 6 RSemaphore semaphore = redisson.getSemaphore("semaphore"); semaphore.acquire(); //或 semaphore.acquireAsync(); semaphore.acquire(23); semaphore.tryAcquire();
可过期信号量
1 2 3 4 5 6 RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); String permitId = semaphore.acquire(); // 获取一个信号,有效期只有2秒钟。 String permitId = semaphore.acquire(2, TimeUnit.SECONDS); // ... semaphore.release(permitId);
直接上最本质的源码片段,lua脚本很简单,对信号量进行计数,acquire时,信号量减1,release时,信号量加1;主要是保证操作的原子性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Override public RFuture<Boolean> tryAcquireAsync(int permits) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can't be negative"); } if (permits == 0) { return RedissonPromise.newSucceededFuture(true); } return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('get', KEYS[1]); " + "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " + "local val = redis.call('decrby', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.<Object>singletonList(getName()), permits); } @Override public RFuture<Void> releaseAsync(int permits) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can't be negative"); } if (permits == 0) { return RedissonPromise.newSucceededFuture(null); } return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call('incrby', KEYS[1], ARGV[1]); " + "redis.call('publish', KEYS[2], value); ", Arrays.<Object>asList(getName(), getChannelName()), permits); }
在最本质的基础上,再深入看一下还做了哪些事,能真正达到一个工业生产标准
tryAcquire() 非阻塞式,有信息量就正常获取,没有刚快速返回,就是lua本质,没有做额外的事情
acquire() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public void acquire(int permits) throws InterruptedException { if (tryAcquire(permits)) { return; } RFuture<RedissonLockEntry> future = subscribe(); commandExecutor.syncSubscription(future); try { while (true) { if (tryAcquire(permits)) { return; } getEntry().getLatch().acquire(permits); } } finally { unsubscribe(future); } }
阻塞式,相对非阻塞式就多了一些事
1.先tryAcquire,看是否能获取到信号量
2.订阅channel事件
3.无限循环
3.1.先tryAcquire(),尝试一下
3.2.通过getEntry().getLatch(),也就是j.u.c.Semaphore,acquire()阻塞
4.取消订阅
订阅事件内部细节,另开篇再说了,他的目的其实就是释放Semaphore
想像一下,同一个client的两个线程A,B 同时需要获取信号量,如果A成功获取,那么B将被Semaphore阻塞住了,何时退出阻塞呢?
就在线程A进行release()之后,会publish,细节可查看上面的release()中的lua脚本,当B监听到事件时,就会调用Semaphore.release(),再次进行tryAcquire()
tryAcquire(int permits, long waitTime, TimeUnit unit) 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 @Override public boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); if (tryAcquire(permits)) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { return false; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> future = subscribe(); if (!await(future, time, TimeUnit.MILLISECONDS)) { return false; } try { time -= (System.currentTimeMillis() - current); if (time <= 0) { return false; } while (true) { current = System.currentTimeMillis(); if (tryAcquire(permits)) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { return false; } // waiting for message current = System.currentTimeMillis(); getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS); time -= (System.currentTimeMillis() - current); if (time <= 0) { return false; } } } finally { unsubscribe(future); } // return get(tryAcquireAsync(permits, waitTime, unit)); }
其实await(future, time, TimeUnit.MILLISECONDS)是使用的CountDownLatch
如果计数到达零,则返回 true;如果在计数到达零之前超过了等待时间,则返回 false
当前是第一个请求,或者别的释放,那就再往下进入循环
CountDownLatch.await()+Semaphore.tryAcquire()配合使用
每一次等待时间后,都需要检查是否超过等待时间
为什么需要引入CountDownLatch.await()呢? 都使用Semaphore.tryAcquire()不行吗?这个需要再次深入挖掘了
总结 分布式信号量,原理很明了,主要还是通过lua保障redis操作的原子性
阅读redisson源码,发现里面的操作基本都是异步化,底层又是基于netty,大量使用了future模式,如果不知道future模式,会很绕,debug都会晕掉,所以在深入redisson之前,需要再对future模式温习一下