本文基于 Guava-18.0.jar 。
0. 概述
RateLimiter
是令牌桶思想的一个实现,可实现流量整形、资源访问速率控制。
与信号量对比:
- 一旦从
RateLimiter
获得许可,不需要释放。信号量分配后必须释放。 RateLimiter
控制的是速率,以配置的速率分发许可,速率不变时单位时间内分发的许可量是恒定的。信号量控制的是并发访问的数量,单位时间内分配的次数跟使用许可的时长有关,每次申请使用的时间越短,则单位时间内能分配的次数就越多。
对 RateLimiter
请求许可的数量不会对请求本身产生抑制影响,但会对下一个请求产生抑制。例如一个请求很大数量许可的请求到达空闲 RateLimiter
时,它将马上获得许可,但下一个请求会被抑制,从而为前面昂贵的请求付出代价。
从 RateLimiter
申请许可时,可能会阻塞、也可能不会,是否阻塞取决于上一次分配许可的时间和当前请求的许可数量。
RateLimiter
可以配置一个 warnup 热身周期,在这个周期内每秒发出的许可数量稳步增长直至达到稳定的速率。简单说是慢启动吧。
1. 使用示例:
// 每秒2个的速率提交任务
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // may wait
executor.execute(task);
}
}
2. 浅析
2.0 继承关系
RateLimiter
-- SmoothRateLimiter
-- SmoothBursty
-- SmoothWarmingUp
2.1 创建
RateLimiter
是个抽象类,提供静态方法来创建具体的子类实例。
每个实例持有一个 SleepingStopwatch
实例,用来计时、提供睡眠等待的功能。
RateLimiter
是线程安全的,通过 synchronized
关键字来保证。
public abstract class RateLimiter {
public static RateLimiter create(double permitsPerSecond) {
return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
}
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit);
}
static RateLimiter create(
SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
private final SleepingStopwatch stopwatch;
// 用来提供监视器锁的对象
private volatile Object mutexDoNotUseDirectly;
}
2.2 线程安全性与等待
public double acquire(int permits) {
// 申请给定数量的许可,返回成功获得许可需要等待的时间。
long microsToWait = reserve(permits);
// 进行等待
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
// 利用监视器锁保证线程安全性
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
// 返回可获得许可的时刻
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// 与当前时间做差才是要等待的时长
return max(momentAvailable - nowMicros, 0);
}
// 具体的分配许可的行为由子类实现,在 SmoothRateLimiter 类里
abstract long reserveEarliestAvailable(int permits, long nowMicros);
2.3 令牌分配核心算法
下面的代码位于 SmoothRateLimiter
类里。
属性:
// 当前已存储的许可,令牌桶桶里攒的
double storedPermits;
// 令牌桶能容纳的最大许可数量
double maxPermits;
// 稳定速率下、生成两个许可之间的时间间隔。
// 例如稳定速率是每秒 5 个许可,那么稳定的间隔是 200ms
double stableIntervalMicros;
// 下一次请求(不管申请的大小)被授予的时间。
// 在授权一个请求后,会把这个值推向更远的未来。更大的请求比小的请求 推的更远。
// 此值可能在过去或未来
private long nextFreeTicketMicros = 0L;
令牌分配核心算法:
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 同步时间和桶里的令牌数量
resync(nowMicros);
// 当前请求获取成功的时间为 nextFreeTicketMicros
// 与请求数量 requiredPermits 无关
long returnValue = nextFreeTicketMicros;
// 计算当前立即可得的许可数量
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// 当前请求需要新生成的许可数量
double freshPermits = requiredPermits - storedPermitsToSpend;
// 生成上述数量新许可需要的时间
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 新许可需要的时间由下一个请求承担
this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
private void resync(long nowMicros) {
// nextFreeTicket 处于过去,说明当前请求可以立即申请成功
if (nowMicros > nextFreeTicketMicros) {
// 更新桶里的令牌数量、但不能超过桶的容量
storedPermits = min(maxPermits,
storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
nextFreeTicketMicros = nowMicros;
}
}
该算法的核心逻辑为:
- 如果
nextFreeTicketMicros
处于过去,当前请求可以立即成功,不管它申请的许可数量是多少。 - 如果
nextFreeTicketMicros
处于未来,说明当前请求受前面请求的影响,需要等待(nowMicros - nextFreeTicketMicros)
。 - 要等待的许可数量为
freshPermits = requiredPermits - min(requiredPermits, this.storedPermits)
。 - 要等待的时间为
waitMicros = X + (long) (freshPermits * stableIntervalMicros)
,这个等待时间由下一个请求承受。其中X = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
由子类的具体实现决定,SmoothWarmingUp
子类用该方法实现慢启动。 - 更新下一个请求可以立即成功的时间:
this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros
。
上面的逻辑贯彻了前面所说的:
对
RateLimiter
请求许可的数量不会对请求本身产生抑制影响,但会对下一个请求产生抑制。例如一个请求很大数量许可的请求到达空闲RateLimiter
时,它将马上获得许可,但下一个请求会被抑制,从而为前面昂贵的请求付出代价。
3. 小结
RateLimiter
的核心思路是优先满足当前请求。
如果桶里的令牌数量不能满足当前请求,为了保证速率要求,可以让当前请求等待令牌生成,也就是让当前请求等待,也可以让后面的请求延迟执行。
不管是让谁等都可能要等待,那不如让当前的请求优先满足、后面的请求去等待。如果后面的请求来得很晚或者根本就不来,那就不需要等待、赚到了。
与漏桶算法的对比:
- 从上面的实现可知,令牌桶是允许突发流量的,这个突发的峰值还不受桶容量的限制。
- 漏桶算法的流出速率是恒定的,不存在突发流量的情况。
欢迎关注我的微信公众号: coderbee笔记 。