限流 | Go 实现速率限制

在开发高并发系统时,为了保证系统的高可用和稳定性,业内流传着“三把斧”:

  • 缓存:提升系统访问速度,增大系统处理容量
  • 降级:当服务出现故障或影响到核心业务时,暂时屏蔽掉,待高峰期过后或故障解决后再打开
  • 限流:通过对并发(或者一定时间窗口内)请求进行限速来保护系统,一旦达到限制速率则拒绝服务(定向到错误页或告知资源没有了)、排队等待(比如秒杀、评论、下单)、降级(返回兜底数据或默认数据)

最近,由于我负责的一个项目后端这边需要控制 API 的访问频率,于是研究了下 限流 相关的技术。

一般来说,限流的常用处理手段有:

  • 计数器
  • 滑动窗口
  • 漏桶
  • 令牌桶

计数器

计数器是一种比较简单粗暴的限流算法:

在一段时间间隔内,对请求进行计数,与阀值进行比较判断是否需要限流,一旦到了时间临界点,将计数器清零。

讨论两种通用做法。

方案一

当请求频率过快时,直接抛弃后续请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main

import (
"fmt"
"sync"
"time"
)

type LimitRate struct {
rate int //计数周期内最多允许的请求数
begin time.Time //计数开始时间
cycle time.Duration //计数周期
count int //计数周期内累计收到的请求数
lock sync.Mutex
}

func (l *LimitRate) Allow() bool {
l.lock.Lock()
defer l.lock.Unlock()

if l.count == l.rate-1 {
now := time.Now()
if now.Sub(l.begin) >= l.cycle {
//速度允许范围内, 重置计数器
l.Reset(now)
return true
} else {
return false
}
} else {
//没有达到速率限制,计数加1
l.count++
return true
}
}

func (l *LimitRate) Set(r int, cycle time.Duration) {
l.rate = r
l.begin = time.Now()
l.cycle = cycle
l.count = 0
}

func (l *LimitRate) Reset(t time.Time) {
l.begin = t
l.count = 0
}

func main() {
var wg sync.WaitGroup
var lr LimitRate
lr.Set(3, time.Second) // 1s内最多请求3次

for i := 0; i < 10; i++ {
wg.Add(1)

fmt.Println("Create req", i, time.Now())
go func(i int) {
if lr.Allow() {
fmt.Println("Respon req", i, time.Now())
}
wg.Done()
}(i)

time.Sleep(200 * time.Millisecond)
}
wg.Wait()
}

例子里面每 200ms 创建一个请求,速率明显高于 1s内最多3个 的限制, 我们把请求的创建时间和结果返回时间打印出来,方便观察。运行显示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Create req 0 2018-09-12 17:03:43.759413 +0800 CST m=+0.000297262
Respon req 0 2018-09-12 17:03:43.759539 +0800 CST m=+0.000422748
Create req 1 2018-09-12 17:03:43.961558 +0800 CST m=+0.202436188
Respon req 1 2018-09-12 17:03:43.961637 +0800 CST m=+0.202515351
Create req 2 2018-09-12 17:03:44.165453 +0800 CST m=+0.406321427 x
Create req 3 2018-09-12 17:03:44.365722 +0800 CST m=+0.606573032 x
Create req 4 2018-09-12 17:03:44.567892 +0800 CST m=+0.808749192 x
Create req 5 2018-09-12 17:03:44.769219 +0800 CST m=+1.010065632
Respon req 5 2018-09-12 17:03:44.769377 +0800 CST m=+1.010231224
Create req 6 2018-09-12 17:03:44.970477 +0800 CST m=+1.211325152
Respon req 6 2018-09-12 17:03:44.970539 +0800 CST m=+1.211387021
Create req 7 2018-09-12 17:03:45.171685 +0800 CST m=+1.412526161
Respon req 7 2018-09-12 17:03:45.171741 +0800 CST m=+1.412583132
Create req 8 2018-09-12 17:03:45.372879 +0800 CST m=+1.613709972 x
Create req 9 2018-09-12 17:03:45.573016 +0800 CST m=+1.813845388 x

平均每秒最多只显示 3 次 “Respon req”,请求 2、3、4、8、9 被丢弃,说明限速成功。

方案二

当请求频率太快时,后续的请求等待之前的请求完成后才进行,稍微修改下 Allow 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (l *LimitRate) Allow() bool {
l.lock.Lock()
defer l.lock.Unlock()

if l.count == l.rate-1 {
for {
now := time.Now()
if now.Sub(l.begin) >= l.cycle {
//速度允许范围内, 重置计数器
l.Reset(now)
return true
} else {
// wait
}
}
} else {
//没有达到速率限制,计数加1
l.count++
return true
}
}

运行显示,10 次请求总共花费了 3s 多,符合预期,超过请求速率的后续请求依次被排队处理成功,最明显的是请求 9,创建后等待近 2s 才返回结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Create req 0 2018-09-12 17:09:26.177354 +0800 CST m=+0.000317504
Respon req 0 2018-09-12 17:09:26.177489 +0800 CST m=+0.000452028
Create req 1 2018-09-12 17:09:26.377706 +0800 CST m=+0.200662813
Respon req 1 2018-09-12 17:09:26.377743 +0800 CST m=+0.200700428
Create req 2 2018-09-12 17:09:26.577843 +0800 CST m=+0.400793565
Create req 3 2018-09-12 17:09:26.778119 +0800 CST m=+0.601064014
Create req 4 2018-09-12 17:09:26.978772 +0800 CST m=+0.801710852
Respon req 2 2018-09-12 17:09:27.177392 +0800 CST m=+1.000325104
Respon req 4 2018-09-12 17:09:27.177469 +0800 CST m=+1.000401716
Respon req 3 2018-09-12 17:09:27.177439 +0800 CST m=+1.000372204
Create req 5 2018-09-12 17:09:27.179018 +0800 CST m=+1.001942192
Create req 6 2018-09-12 17:09:27.379832 +0800 CST m=+1.202758661
Create req 7 2018-09-12 17:09:27.580378 +0800 CST m=+1.403297124
Create req 8 2018-09-12 17:09:27.781084 +0800 CST m=+1.603998402
Create req 9 2018-09-12 17:09:27.981209 +0800 CST m=+1.804117865 //
Respon req 5 2018-09-12 17:09:28.177425 +0800 CST m=+2.000323426
Respon req 6 2018-09-12 17:09:28.177456 +0800 CST m=+2.000358896
Respon req 7 2018-09-12 17:09:28.177473 +0800 CST m=+2.000375963
Respon req 8 2018-09-12 17:09:29.177453 +0800 CST m=+3.000326135
Respon req 9 2018-09-12 17:09:29.177477 +0800 CST m=+3.000350214 //

real 0m3.302s
user 0m1.802s
sys 0m1.021s

计数器算法存在“时间临界点”缺陷。比如每一分钟限速 100 个请求(也就是说每秒最多 1.7 个请求),在 00:00:0000:00:58 这段时间内没有用户请求,然后在 00:00:59这一瞬时发出100个请求,这是允许的,然后在 00:01:00 这一瞬时又发出了 100 个请求,短短 1s 内发出了 200 个请求,系统可能会承受恶意用户的大量请求,甚至击穿系统。

滑动窗口

针对计数器存在的临界点缺陷:

滑动窗口把固定时间片进行划分,并且随着时间的流逝,进行移动,固定数量的可以移动的格子,进行计数并判断阀值。

格子的数量影响着滑动窗口算法的精度,依然有时间片的概念,无法根本解决临界点问题。

漏桶

漏桶算法描述如下:

  • 一个固定容量的漏桶,按照固定速率流出水滴;
  • 如果桶是空的,则不需流出水滴;
  • 可以以任意速率流入水滴到漏桶;
  • 如果流入水滴超出了桶的容量,则溢出(被丢弃)

通俗点来说,我们有一个固定容量的桶,有水流进来,也有水流出去。对于流进来的水(请求)来说,我们无法预计一共有多少水会流进来,也无法预计水流的速度。但是对于流出去的水来说,这个桶可以固定水流出的速率(处理速度),从而达到 流量整形流量控制 的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import (
"fmt"
"math"
"sync"
"time"
)

type LeakyBucket struct {
rate float64 //固定每秒出水速率
capacity float64 //桶的容量
water float64 //桶中当前水量
lastLeakMs int64 //桶上次漏水时间戳 ms

lock sync.Mutex
}

func (l *LeakyBucket) Allow() bool {
l.lock.Lock()
defer l.lock.Unlock()

now := time.Now().UnixNano() / 1e6
eclipse := float64((now - l.lastLeakMs)) * l.rate / 1000 //先执行漏水
l.water = l.water - eclipse //计算剩余水量
l.water = math.Max(0, l.water) //桶干了
l.lastLeakMs = now
if (l.water + 1) < l.capacity {
// 尝试加水,并且水还未满
l.water++
return true
} else {
// 水满,拒绝加水
return false
}
}

func (l *LeakyBucket) Set(r, c float64) {
l.rate = r
l.capacity = c
l.water = 0
l.lastLeakMs = time.Now().UnixNano() / 1e6
}

func main() {
var wg sync.WaitGroup
var lr LeakyBucket
lr.Set(3, 3) //每秒访问速率限制为3个请求,桶容量为3

for i := 0; i < 10; i++ {
wg.Add(1)

fmt.Println("Create req", i, time.Now())
go func(i int) {
if lr.Allow() {
fmt.Println("Respon req", i, time.Now())
}
wg.Done()
}(i)

time.Sleep(100 * time.Millisecond)
}
wg.Wait()
}

这里初始化桶容量为 3 个单位,桶内无水,1s 内创建了 10 个请求,随着请求往里面“滴水”,桶按照每秒3 个单位的速率流出水,处理时,部分请求被丢弃:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Create req 0 2018-09-12 20:13:47.742497 +0800 CST m=+0.000314490
Respon req 0 2018-09-12 20:13:47.742634 +0800 CST m=+0.000451781
Create req 1 2018-09-12 20:13:47.847836 +0800 CST m=+0.105648928
Respon req 1 2018-09-12 20:13:47.847971 +0800 CST m=+0.105785251
Create req 2 2018-09-12 20:13:47.948029 +0800 CST m=+0.205839614
Respon req 2 2018-09-12 20:13:47.948093 +0800 CST m=+0.205901092
Create req 3 2018-09-12 20:13:48.053237 +0800 CST m=+0.311044273 x
Create req 4 2018-09-12 20:13:48.153545 +0800 CST m=+0.411304284
Respon req 4 2018-09-12 20:13:48.153564 +0800 CST m=+0.411369118
Create req 5 2018-09-12 20:13:48.254636 +0800 CST m=+0.512437744 x
Create req 6 2018-09-12 20:13:48.355816 +0800 CST m=+0.613609405 x
Create req 7 2018-09-12 20:13:48.456968 +0800 CST m=+0.714763347
Respon req 7 2018-09-12 20:13:48.457017 +0800 CST m=+0.714812815
Create req 8 2018-09-12 20:13:48.557203 +0800 CST m=+0.814995585 x
Create req 9 2018-09-12 20:13:48.658431 +0800 CST m=+0.916216265 x

令牌桶

由于漏桶的出水速度是恒定的,如果瞬时爆发大流量的话,将有大部分请求被丢弃掉(溢出)。

为了解决这个问题,令牌桶进行了算法改进,算法描述如下:

  • 有一个固定容量的桶,桶一开始是空的;
  • 以固定的速率 r 往桶里填充 token,直到达到桶的容量,多余的令牌将会被丢弃;
  • 每当一个请求过来时,就尝试从桶里移除一个令牌,如果没有令牌的话,请求无法通过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main

import (
"fmt"
"sync"
"time"
)

type TokenBucket struct {
rate int64 //固定的token放入速率, r/s
capacity int64 //桶的容量
tokens int64 //桶中当前token数量
lastTokenSec int64 //桶上次放token的时间戳 s

lock sync.Mutex
}

func (l *TokenBucket) Allow() bool {
l.lock.Lock()
defer l.lock.Unlock()

now := time.Now().Unix()
l.tokens = l.tokens + (now-l.lastTokenSec)*l.rate // 先添加令牌
if l.tokens > l.capacity {
l.tokens = l.capacity
}
l.lastTokenSec = now
if l.tokens > 0 {
// 还有令牌,领取令牌
l.tokens--
return true
} else {
// 没有令牌,则拒绝
return false
}
}

func (l *TokenBucket) Set(r, c int64) {
l.rate = r
l.capacity = c
l.tokens = 0
l.lastTokenSec = time.Now().Unix()
}

func main() {
var wg sync.WaitGroup
var lr TokenBucket
lr.Set(3, 3) //每秒访问速率限制为3个请求,桶容量为3

time.Sleep(time.Second)
for i := 0; i < 10; i++ {
wg.Add(1)

fmt.Println("Create req", i, time.Now())
go func(i int) {
if lr.Allow() {
fmt.Println("Respon req", i, time.Now())
}
wg.Done()
}(i)

time.Sleep(200 * time.Millisecond)
}
wg.Wait()
}

类似的,这里初始化桶容量为 3 个单位,桶内无令牌,每 1s 产生 3 个令牌,主程序阻塞 1s 以便让桶中储备好 3 个令牌,而后每 1s 创建 5 个请求,获取访问令牌:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Create req 0 2018-09-12 20:56:30.926267 +0800 CST m=+1.002077637
Respon req 0 2018-09-12 20:56:30.926512 +0800 CST m=+1.002322722
Create req 1 2018-09-12 20:56:31.127552 +0800 CST m=+1.203356343
Respon req 1 2018-09-12 20:56:31.127643 +0800 CST m=+1.203445860
Create req 2 2018-09-12 20:56:31.328242 +0800 CST m=+1.404040714
Respon req 2 2018-09-12 20:56:31.328311 +0800 CST m=+1.404110620
Create req 3 2018-09-12 20:56:31.529957 +0800 CST m=+1.605746538
Respon req 3 2018-09-12 20:56:31.530069 +0800 CST m=+1.605862175
Create req 4 2018-09-12 20:56:31.734506 +0800 CST m=+1.810291673 x
Create req 5 2018-09-12 20:56:31.938578 +0800 CST m=+2.014347368 x
Create req 6 2018-09-12 20:56:32.141289 +0800 CST m=+2.217061017
Respon req 6 2018-09-12 20:56:32.141379 +0800 CST m=+2.217153670
Create req 7 2018-09-12 20:56:32.341492 +0800 CST m=+2.417260734
Respon req 7 2018-09-12 20:56:32.341571 +0800 CST m=+2.417339558
Create req 8 2018-09-12 20:56:32.543497 +0800 CST m=+2.619259765
Respon req 8 2018-09-12 20:56:32.543554 +0800 CST m=+2.619317332
Create req 9 2018-09-12 20:56:32.746344 +0800 CST m=+2.822096642 x

由于桶中可以储备令牌,这使得令牌桶算法支持一定程度突发的大流量并发访问,也就是说,假设桶内有 100 个 token 时,那么可以瞬间允许 100 个请求通过。这点,对用户比较友好,因而业内多半采用该算法。

当然,不论是对于令牌桶拿不到令牌被拒绝,还是漏桶的水满了溢出,都是为了保证大部分流量的正常访问,而牺牲掉了少部分流量,这是合理的。

rate

Go 语言中的 golang.org/x/time/rate 包采用了令牌桶算法来实现速率限制,简单介绍下这个包的用法:

这个包的核心部分在 Limit 类型和 NewLimiter接口:

1
2
3
4
5
6
7
8
// Limit 定义了事件的最大频率。
// Limit 被表示为每秒事件的数量。
// 值为0的Limit不允许任何事件。
type Limit float64

// 返回一个新的Limiter实例,
// 事件发生率为r,并允许至多b个令牌爆发。
func NewLimiter(r Limit, b int) *Limiter

NewLimiter 中,我们看到了两个熟悉的参数:rb,b 是我们之前讨论过的桶的深度。

rate 包还定义了一个有用的帮助函数 Every,将 time.Duration 转换为Limit对象:

1
2
// Every 将事件之间的最小时间间隔转换为 Limit。
func Every(interval time.Duration) Limit

创建 Limiter 对象后,我们使用 Wait,AllowReserve 方法来阻塞我们的请求,直到获得访问令牌:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// WaitN(ctx, 1)的简写。
func (lim *Limiter) Wait(ctx context.Context)

// WaitN 会发生阻塞直到 lim 允许的 n 个事件执行。
// 如果 n 超过了令牌池的容量大小则报错。
// 如果 ctx 被取消或等待时间超过了 ctx 的超时时间则报错。
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)

// AllowN(time.Now(), 1)的简写。
func (lim *Limiter) Allow() bool

// AllowN 标识在时间 now 的时候,n 个事件是否可以同时发生,
// 意思就是 now 的时候是否可以从令牌池中取 n 个令牌,
func (lim *Limiter) AllowN(now time.Time, n int) bool

// ReserveN(time.Now(), 1)的简写。
func (lim *Limiter) Reserve() *Reservation

// ReserveN 返回对象 Reservation ,标识调用者需要等多久才能等到 n 个事件发生,
// 意思就是 等多久令牌池中至少含有 n 个令牌。
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

// eg:
r := lim.ReserveN(time.Now(), 1)
if !r.OK() {
// Not allowed to act! Did you remember to set lim.burst to be > 0 ?
return
}
time.Sleep(r.Delay())
Act()
  • 如果要使用context 的截止日期或 cancel 方法的话,使用 WaitN。
  • 如果需要在事件超出频率的时候丢弃或跳过事件,就使用 Allow,否则使用 Reserve 或 Wait。
  • 如果事件发生的频率是可以由调用者控制的话,可以用 ReserveN 来控制事件发生的速度而不丢掉事件。

使用 rate 包来实现一个简单的 http 限流中间件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"net/http"

"golang.org/x/time/rate"
)

var limiter = rate.NewLimiter(2, 5)

func limit(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}

func okHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK\n"))
}

func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", okHandler)
http.ListenAndServe(":4000", limit(mux))
}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
hxzdeMac-mini:~ $ while true; do
> curl http://localhost:4000/
> sleep 0.1s
> done
OK
OK
OK
OK
OK //首次爆发5个请求
OK
Too Many Requests
Too Many Requests
Too Many Requests
OK //之后每秒2个请求
Too Many Requests
Too Many Requests
Too Many Requests
OK
...

稍微修改下这个程序还可以实现 用户粒度的限流,基于IP地址或API密钥等标识符为每个用户实施速率限制器。

更健全的 http 限流中间件推荐看看这个库:https://github.com/didip/tollbooth

最后

上面的几种算法实现比较直白,只是算法语言的翻译,没有充分利用 Go 语言里面 goroutine、channel、定时器等特性,感兴趣的读者可以改造下。

另外,《聊聊高并发系统之限流特技》的两篇文章,非常棒,详细介绍了各种限流算法以及应用级限流、分布式限流、接入层限流等非常实用的技术。

参考

彦祖老师 wechat