本文目录导读:

使用令牌桶算法实现本地限流器,可以通过以下几个关键步骤完成,这里给出一个 Java 实现的完整示例,并附上核心思路说明。
核心原理
令牌桶算法的核心思想:
- 以固定速率向桶中添加令牌
- 每个请求消耗一个令牌
- 桶有上限容量,防止突发流量过大
- 如果桶中无可用令牌,请求被拒绝或等待
Java 实现
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TokenBucketRateLimiter {
// 桶的最大容量
private final long capacity;
// 令牌生成速率(每秒生成令牌数)
private final double tokensPerSecond;
// 当前桶中令牌数量
private double currentTokens;
// 上次令牌补充时间(纳秒)
private long lastRefillTime;
// 并发控制锁
private final Lock lock = new ReentrantLock();
/**
* 构造函数
* @param capacity 桶容量
* @param tokensPerSecond 每秒生成令牌数
*/
public TokenBucketRateLimiter(long capacity, double tokensPerSecond) {
this.capacity = capacity;
this.tokensPerSecond = tokensPerSecond;
this.currentTokens = capacity; // 初始满桶
this.lastRefillTime = System.nanoTime();
}
/**
* 尝试获取一个令牌(立即返回)
* @return 是否获取成功
*/
public boolean tryAcquire() {
return tryAcquire(1);
}
/**
* 尝试获取指定数量的令牌(立即返回)
* @param tokens 需要的令牌数
* @return 是否获取成功
*/
public boolean tryAcquire(int tokens) {
lock.lock();
try {
refillTokens();
if (currentTokens >= tokens) {
currentTokens -= tokens;
return true;
}
return false;
} finally {
lock.unlock();
}
}
/**
* 获取一个令牌(等待直到获取)
* @throws InterruptedException 如果等待时被中断
*/
public void acquire() throws InterruptedException {
acquire(1);
}
/**
* 获取指定数量的令牌(等待直到获取)
* @param tokens 需要的令牌数
* @throws InterruptedException 如果等待时被中断
*/
public void acquire(int tokens) throws InterruptedException {
long waitTime = reserve(tokens);
if (waitTime > 0) {
Thread.sleep(waitTime);
}
}
/**
* 预留令牌,返回需要等待的时间
*/
private long reserve(int tokens) {
lock.lock();
try {
refillTokens();
// 计算当前令牌是否足够
if (currentTokens >= tokens) {
currentTokens -= tokens;
return 0;
}
// 计算需要等待的时间(毫秒)
// 缺少的令牌数
double deficit = tokens - currentTokens;
// 需要的等待时间(毫秒)
long waitTimeMs = (long)(deficit / tokensPerSecond * 1000);
// 当前使用,但队列中等待(这里是预扣除)
currentTokens = -deficit; // 表示已经欠费
return waitTimeMs;
} finally {
lock.unlock();
}
}
/**
* 补充令牌
*/
private void refillTokens() {
long now = System.nanoTime();
// 计算时间差(秒)
double elapsedSeconds = (now - lastRefillTime) / 1_000_000_000.0;
// 计算应该生成的令牌数
double newTokens = elapsedSeconds * tokensPerSecond;
// 更新令牌数量(不超过容量)
currentTokens = Math.min(capacity, currentTokens + newTokens);
lastRefillTime = now;
}
/**
* 获取当前桶中令牌数(近似值)
*/
public double getCurrentTokens() {
lock.lock();
try {
refillTokens();
return currentTokens;
} finally {
lock.unlock();
}
}
/**
* 重置限流器
*/
public void reset() {
lock.lock();
try {
currentTokens = capacity;
lastRefillTime = System.nanoTime();
} finally {
lock.unlock();
}
}
// 使用示例
public static void main(String[] args) throws InterruptedException {
// 创建限流器:桶容量5,每秒生成2个令牌
TokenBucketRateLimiter limiter = new TokenBucketRateLimiter(5, 2);
// 测试非阻塞获取
for (int i = 0; i < 10; i++) {
boolean acquired = limiter.tryAcquire();
System.out.println("请求 " + i + ": " + (acquired ? "通过" : "被限流")
+ " (当前令牌数: " + limiter.getCurrentTokens() + ")");
Thread.sleep(200);
}
System.out.println("\n等待2秒后...");
Thread.sleep(2000);
// 测试阻塞获取
System.out.println("尝试阻塞获取2个令牌...");
long start = System.currentTimeMillis();
limiter.acquire(2);
long end = System.currentTimeMillis();
System.out.println("获取成功,等待时间: " + (end - start) + "ms");
}
}
优化版本:基于队列的时间戳实现
对于高并发场景,上述实现中 synchronized 或 ReentrantLock 可能成为瓶颈,以下是基于时间戳队列的高性能实现:
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
public class HighPerformanceTokenBucket {
// 每个令牌生成的最小时间间隔(纳秒)
private final long intervalNanos;
// 桶容量
private final long capacity;
// 上次检查时间(用于非精准但性能更好的计算)
private final AtomicLong lastCheckNanos;
// 当前可用令牌的近似值(仅用于快速判断,不保证精确)
private volatile double availableTokens;
// 精确模式:使用队列记录每个令牌的时间
private final boolean preciseMode;
private final ConcurrentLinkedQueue<Long> tokenTimes;
public HighPerformanceTokenBucket(long capacity, double tokensPerSecond) {
this.capacity = capacity;
this.intervalNanos = (long)(1_000_000_000L / tokensPerSecond);
this.lastCheckNanos = new AtomicLong(System.nanoTime());
this.availableTokens = capacity;
this.preciseMode = false; // 默认非精确模式
this.tokenTimes = null;
}
/**
* 高性能非阻塞获取(近似实现)
*/
public boolean tryAcquire() {
while (true) {
long now = System.nanoTime();
long lastCheck = lastCheckNanos.get();
// 计算自上次检查后生成的令牌数
long elapsed = now - lastCheck;
double newTokens = (double) elapsed / intervalNanos;
// 更新状态(CAS)
if (lastCheckNanos.compareAndSet(lastCheck, now)) {
double tokens = Math.min(capacity, availableTokens + newTokens);
if (tokens >= 1) {
availableTokens = tokens - 1;
return true;
} else {
availableTokens = tokens;
return false;
}
}
// CAS失败,重试
Thread.yield();
}
}
}
使用注意事项
-
线程安全:必须使用锁或CAS操作保证并发安全
-
时间精度:使用
System.nanoTime()而非System.currentTimeMillis(),避免系统时间调整带来的问题 -
性能考虑:
- 低并发场景:使用锁实现即可
- 高并发场景:考虑CAS或分段锁
-
实际应用建议:
- 可以集成到
Filter或Interceptor中实现全局限流 - 结合
Guava RateLimiter等成熟库使用 - 注意设置合理的桶容量和速率参数
- 可以集成到
-
扩展功能:
- 可以加入预热模式,让限流器逐渐达到最大速率
- 可以支持不同优先级的请求
这个实现足够应对大多数本地限流场景,如果需要在分布式环境中使用,则需要考虑使用 Redis 等中间件实现分布式限流。