RateLimiter 浅析

本文基于 Guava-18.0.jar 。

0. 概述

RateLimiter 是令牌桶思想的一个实现,可实现流量整形、资源访问速率控制。

与信号量对比:

  • 一旦从 RateLimiter 获得许可,不需要释放。信号量分配后必须释放。
  • RateLimiter 控制的是速率,以配置的速率分发许可,速率不变时单位时间内分发的许可量是恒定的。信号量控制的是并发访问的数量,单位时间内分配的次数跟使用许可的时长有关,每次申请使用的时间越短,则单位时间内能分配的次数就越多。

RateLimiter 请求许可的数量不会对请求本身产生抑制影响,但会对下一个请求产生抑制。例如一个请求很大数量许可的请求到达空闲 RateLimiter 时,它将马上获得许可,但下一个请求会被抑制,从而为前面昂贵的请求付出代价。

RateLimiter 申请许可时,可能会阻塞、也可能不会,是否阻塞取决于上一次分配许可的时间和当前请求的许可数量。

RateLimiter 可以配置一个 warnup 热身周期,在这个周期内每秒发出的许可数量稳步增长直至达到稳定的速率。简单说是慢启动吧。

1. 使用示例:

// 每秒2个的速率提交任务
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List tasks, Executor executor) {
    for (Runnable task : tasks) {
        rateLimiter.acquire(); // may wait
        executor.execute(task);
    }
}

2. 浅析

2.0 继承关系

RateLimiter
    -- SmoothRateLimiter
        -- SmoothBursty
        -- SmoothWarmingUp

2.1 创建

RateLimiter 是个抽象类,提供静态方法来创建具体的子类实例。

每个实例持有一个 SleepingStopwatch 实例,用来计时、提供睡眠等待的功能。

RateLimiter 是线程安全的,通过 synchronized 关键字来保证。

public abstract class RateLimiter {
  public static RateLimiter create(double permitsPerSecond) {
    return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
  }

  static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }

  public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
    checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
    return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit);
  }

  static RateLimiter create(
      SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }

  private final SleepingStopwatch stopwatch;

  // 用来提供监视器锁的对象
  private volatile Object mutexDoNotUseDirectly;
}

2.2 线程安全性与等待

  public double acquire(int permits) {
    // 申请给定数量的许可,返回成功获得许可需要等待的时间。
    long microsToWait = reserve(permits);

    // 进行等待
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

  final long reserve(int permits) {
    checkPermits(permits);

    // 利用监视器锁保证线程安全性
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }

  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    // 返回可获得许可的时刻
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);

    // 与当前时间做差才是要等待的时长
    return max(momentAvailable - nowMicros, 0);
  }

  // 具体的分配许可的行为由子类实现,在 SmoothRateLimiter 类里
  abstract long reserveEarliestAvailable(int permits, long nowMicros);

2.3 令牌分配核心算法

下面的代码位于 SmoothRateLimiter 类里。

属性:

// 当前已存储的许可,令牌桶桶里攒的
double storedPermits;

// 令牌桶能容纳的最大许可数量
double maxPermits;

// 稳定速率下、生成两个许可之间的时间间隔。
// 例如稳定速率是每秒 5 个许可,那么稳定的间隔是 200ms
double stableIntervalMicros;

// 下一次请求(不管申请的大小)被授予的时间。
// 在授权一个请求后,会把这个值推向更远的未来。更大的请求比小的请求 推的更远。
// 此值可能在过去或未来
private long nextFreeTicketMicros = 0L;

令牌分配核心算法:

  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    // 同步时间和桶里的令牌数量
    resync(nowMicros);

    // 当前请求获取成功的时间为 nextFreeTicketMicros
    // 与请求数量 requiredPermits 无关
    long returnValue = nextFreeTicketMicros;

    // 计算当前立即可得的许可数量
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);

    // 当前请求需要新生成的许可数量
    double freshPermits = requiredPermits - storedPermitsToSpend;

    // 生成上述数量新许可需要的时间
    long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
        + (long) (freshPermits * stableIntervalMicros);

    // 新许可需要的时间由下一个请求承担
    this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;

    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

  private void resync(long nowMicros) {
    // nextFreeTicket 处于过去,说明当前请求可以立即申请成功
    if (nowMicros > nextFreeTicketMicros) {
      // 更新桶里的令牌数量、但不能超过桶的容量
      storedPermits = min(maxPermits,
          storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
      nextFreeTicketMicros = nowMicros;
    }
  }

该算法的核心逻辑为:

  1. 如果 nextFreeTicketMicros 处于过去,当前请求可以立即成功,不管它申请的许可数量是多少。
  2. 如果 nextFreeTicketMicros 处于未来,说明当前请求受前面请求的影响,需要等待 (nowMicros - nextFreeTicketMicros)
  3. 要等待的许可数量为 freshPermits = requiredPermits - min(requiredPermits, this.storedPermits)
  4. 要等待的时间为 waitMicros = X + (long) (freshPermits * stableIntervalMicros),这个等待时间由下一个请求承受。其中 X = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) 由子类的具体实现决定,SmoothWarmingUp 子类用该方法实现慢启动。
  5. 更新下一个请求可以立即成功的时间:this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros

上面的逻辑贯彻了前面所说的:

RateLimiter 请求许可的数量不会对请求本身产生抑制影响,但会对下一个请求产生抑制。例如一个请求很大数量许可的请求到达空闲 RateLimiter 时,它将马上获得许可,但下一个请求会被抑制,从而为前面昂贵的请求付出代价。

3. 小结

RateLimiter 的核心思路是优先满足当前请求。

如果桶里的令牌数量不能满足当前请求,为了保证速率要求,可以让当前请求等待令牌生成,也就是让当前请求等待,也可以让后面的请求延迟执行。

不管是让谁等都可能要等待,那不如让当前的请求优先满足、后面的请求去等待。如果后面的请求来得很晚或者根本就不来,那就不需要等待、赚到了。

与漏桶算法的对比:

  • 从上面的实现可知,令牌桶是允许突发流量的,这个突发的峰值还不受桶容量的限制。
  • 漏桶算法的流出速率是恒定的,不存在突发流量的情况。

欢迎关注我的微信公众号: coderbee笔记

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据