限流算法详解
限流算法
1. 滑动窗口
思想: 维护一个时间窗口window 在窗口内部仅允许有limit个请求通过,超过则拒绝
实现: 常用 zset实现,(member=请求ID/随机串,score=毫秒时间戳)需要保证member唯一
过程: 一次请求到来时,先用 Redis 的 TIME 拿当前毫秒时间,再把有序集合里所有“时间戳 ≤ now - window”的旧记录清掉,只保留窗口内的数据。随后统计集合里还剩多少条;若“剩余条数 + 本次需要的 permits”不超过 limit,就把当前请求作为一个唯一 member以“时间戳为 score”写进集合并设置 PEXPIRE=window,视为通过;否则说明本窗口已满,可从最早一条记录的 score 推出要等到它滑出窗口还需多少毫秒,按需返回等待时间或直接拒绝。
-- Redis 滑动窗口限流算法实现
-- KEYS[1]=限流Key ARGV[1]=window(ms) ARGV[2]=limit ARGV[3]=member
-- 获取限流的键名
local key = KEYS[1]
-- 获取当前 Redis 服务器时间
-- TIME 命令返回两个元素:[0]秒数,[1]微秒数
local t = redis.call('TIME')
-- 将时间转换为毫秒时间戳
-- t[1] 是秒数,乘以1000转为毫秒
-- t[2] 是微秒数,除以1000转为毫秒,然后取整
local now = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
-- 获取滑动窗口大小(毫秒)
local window = tonumber(ARGV[1])
-- 获取限流阈值
local limit = tonumber(ARGV[2])
-- 获取成员标识(通常是用户ID或请求标识)
local member = ARGV[3]
-- 清理滑动窗口外的过期数据
-- 移除分数小于 (当前时间 - 窗口大小) 的所有成员
-- 这样只保留窗口时间内的请求记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- 获取当前窗口内的请求数量
local count = redis.call('ZCARD', key)
-- 判断是否超过限流阈值
if count < limit then
-- 未超过限制,允许请求通过
-- 将当前请求添加到有序集合中,分数为当前时间戳
redis.call('ZADD', key, now, member)
-- 设置键的过期时间为窗口大小,防止数据永久存储
redis.call('PEXPIRE', key, window)
-- 计算剩余可用次数(本窗口内)
local remaining = limit - (count + 1)
if remaining < 0 then remaining = 0 end
-- 成功返回:{allowed=1, remaining, waitMs=0}
return {1, remaining, 0}
else
-- 超过限制,计算需要等待的毫秒数(直到最早的一个请求滑出窗口)
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
local oldestScore
if oldest and #oldest >= 2 then
oldestScore = tonumber(oldest[2])
else
oldestScore = now
end
local waitMs = (oldestScore + window) - now
if waitMs < 0 then waitMs = 0 end
-- 失败返回:{allowed=0, remaining=0, waitMs}
return {0, 0, waitMs}
end2. 令牌桶
思想:桶以速率 r 产令牌,最多存到 capacity;请求需 permits 个令牌,有则通过扣减,没则等/拒绝。
实现:HASH或者两个STRING tokens:当前令牌数(可用小数) last_ts:上次回填时间(ms)
过程:一次请求到来时,读取“当前令牌数 tokens”和“上次更新时间 last_ts”,用 now - last_ts 计算这段时间应回填的令牌量 rate * elapsed,把 tokens 充到不超过 capacity。若此时 tokens ≥ permits,则扣减 permits 并通过;否则按缺口 (permits - tokens) / rate 计算需要再生产多久的令牌,决定是等待还是直接拒绝。最后把新的 tokens 与 last_ts=now 回写,并把键的过期时间设为“满桶耗尽时间的两倍”以便清理冷键。
-- Token Bucket (令牌桶) 限流脚本
-- KEYS[1] = 桶的Key
-- ARGV[1] = capacity(桶容量,最大令牌数,整数)
-- ARGV[2] = ratePerSecond(令牌生成速率,浮点,个/秒)
-- ARGV[3] = permits(本次申请的令牌数,整数)
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local permits = tonumber(ARGV[3])
-- 参数校验与兜底
if capacity == nil or capacity <= 0 then
return {0, 0, 1000}
end
if rate == nil or rate <= 0 then
return {0, 0, 1000}
end
if permits == nil or permits <= 0 then
permits = 1
end
-- 使用 Redis 时间,避免客户端时钟偏移
local t = redis.call('TIME')
local now = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
-- 读取状态,首次调用视为满桶
local last_refill = tonumber(redis.call('HGET', key, 'last_refill_ms') or now)
local tokens = tonumber(redis.call('HGET', key, 'tokens') or capacity)
-- 补桶:按时间流逝补充令牌(可为小数),上限为 capacity
if now > last_refill then
local deltaSec = (now - last_refill) / 1000.0
local refill = deltaSec * rate
tokens = tokens + refill
if tokens > capacity then tokens = capacity end
last_refill = now
end
local allowed = 0
local waitMs = 0
local remaining = math.floor(tokens)
-- 尝试消耗令牌
if tokens >= permits then
tokens = tokens - permits
allowed = 1
remaining = math.floor(tokens)
else
-- 不足则计算需要等待时间(直到补足缺口)
local need = permits - tokens
waitMs = math.ceil((need / rate) * 1000)
end
-- 持久化最新状态
redis.call('HSET', key, 'tokens', tokens, 'last_refill_ms', last_refill)
-- 设置过期:按补满时间的两倍,避免冷数据常驻
local ttlMs = math.ceil((capacity / rate) * 1000) * 2
if ttlMs < 1000 then ttlMs = 1000 end
redis.call('PEXPIRE', key, ttlMs)
-- 返回 {是否允许, 剩余(向下取整), 建议等待毫秒}
return {allowed, remaining, waitMs}3. 漏桶
思想:把请求当“加水”,桶底以固定速率 r 漏出——输出被整形为匀速;空闲不足就等/拒绝。
实现:计算式水位(常用):HASH 字段 water(当前水位)、last_drip_ms(上次漏水时间)
键例:lb:{uid}:{route};PEXPIRE ≈ (capacity/r)*2,设上下限
过程:一次请求到来时,读取“当前水位 water”和“上次滴漏时间 last_drip”,根据经过时间 elapsed 把水位按固定漏速 rate 下调:water = max(0, water - rate * elapsed)。随后计算空闲体积 free = capacity - water;若 free ≥ permits,说明能装下这次请求,则把 water 增加 permits 并通过;若不足,则需要先漏出 permits - free 的体积,按 waitMs = ceil((permits - free)/rate)(换算毫秒)得到预计等待时间,选择等待或直接拒绝。最后回写 water 和 last_drip=now,并设置一个与“满桶漏空时间”同量级的 TTL 以自动回收冷键。
-- Leaky Bucket (漏桶) 限流脚本
-- KEYS[1] = 桶的Key
-- ARGV[1] = capacity(桶容量,最大可排队请求数/水量,整数)
-- ARGV[2] = ratePerSecond(出水速率,浮点,个/秒)
-- ARGV[3] = permits(本次请求需要排队的量,整数,默认 1)
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local permits = tonumber(ARGV[3])
if capacity == nil or capacity <= 0 then
return {0, 0, 1000}
end
if rate == nil or rate <= 0 then
return {0, 0, 1000}
end
if permits == nil or permits <= 0 then
permits = 1
end
-- 使用 Redis 时间,避免客户端时钟偏移
local t = redis.call('TIME')
local now = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
-- 读取状态:当前水位(队列深度)与上次出水时间
local last_drip = tonumber(redis.call('HGET', key, 'last_drip_ms'))
local water = tonumber(redis.call('HGET', key, 'water'))
-- 首次使用:初始化为空桶
if last_drip == nil then last_drip = now end
if water == nil then water = 0 end
-- 出水:按时间流逝降低水位,最低为 0
if now > last_drip then
local deltaSec = (now - last_drip) / 1000.0
local leaked = deltaSec * rate
water = water - leaked
if water < 0 then water = 0 end
last_drip = now
end
local allowed = 0
local waitMs = 0
-- 尝试加入新请求(加水):若不溢出则接受
if (water + permits) <= capacity then
water = water + permits
allowed = 1
else
-- 否则计算需要等待的时间:需要出水 (water + permits - capacity)
local overflow = (water + permits) - capacity
waitMs = math.ceil((overflow / rate) * 1000)
end
-- 持久化状态
redis.call('HSET', key, 'water', water, 'last_drip_ms', last_drip)
-- 设置过期时间:按清空满桶耗时的两倍,避免冷数据常驻
local ttlMs = math.ceil((capacity / rate) * 1000) * 2
if ttlMs < 1000 then ttlMs = 1000 end
redis.call('PEXPIRE', key, ttlMs)
-- remaining 表示当前剩余可加入的容量(向下取整且不小于 0)
local remaining = capacity - water
if remaining < 0 then remaining = 0 end
remaining = math.floor(remaining)
return {allowed, remaining, waitMs}