限流算法

什么是限流?

限流(Rate Limiting)是一种控制服务请求速率的机制,用于保护系统免受过载请求的影响。通过限制单位时间内可以处理的请求数量,限流可以:

  1. 防止资源被耗尽
  2. 避免服务雪崩
  3. 保证服务质量
  4. 防止恶意攻击

常见限流算法

1. 计数器算法(固定窗口)

计数器算法是最简单的限流算法,它维护一个计数器记录单位时间内的请求数量,当超过阈值时拒绝请求。

​特点​​:

  • 实现简单
  • 存在临界问题(窗口切换时可能瞬间通过两倍阈值请求)

​Golang 实现​​:

package main

import (
	"fmt"
	"sync"
	"time"
)

type CounterLimiter struct {
	rate  int           // 限制数量
	begin time.Time     // 窗口开始时间
	cycle time.Duration // 窗口周期
	count int           // 计数器
	mu    sync.Mutex    // 锁
}

func NewCounterLimiter(rate int, cycle time.Duration) *CounterLimiter {
	return &CounterLimiter{
		rate:  rate,
		begin: time.Now(),
		cycle: cycle,
		count: 0,
	}
}

func (l *CounterLimiter) Allow() bool {
	l.mu.Lock()
	defer l.mu.Unlock()

	// 如果当前时间超过窗口周期,重置窗口
	if time.Now().After(l.begin.Add(l.cycle)) {
		l.begin = time.Now()
		l.count = 0
	}

	// 判断请求数是否超过限制
	if l.count >= l.rate {
		return false
	}

	l.count++
	return true
}

func main() {
	limiter := NewCounterLimiter(100, time.Minute) // 每分钟100个请求

	for i := 0; i < 150; i++ {
		if limiter.Allow() {
			fmt.Printf("请求 %d 通过\n", i+1)
		} else {
			fmt.Printf("请求 %d 被限流\n", i+1)
		}
	}
}

2. 滑动窗口算法

滑动窗口算法是对计数器算法的改进,它将时间窗口划分为多个小窗口,通过统计这些小窗口的请求数来避免临界问题。

​特点​​:

  • 解决了计数器算法的临界问题
  • 比计数器算法更精确
  • 实现稍复杂

​Golang 实现​​:

package main

import (
	"fmt"
	"sync"
	"time"
)

type SlidingWindowLimiter struct {
	rate        int           // 限制数量
	windowSize  time.Duration // 窗口大小
	segmentSize time.Duration // 小窗口大小
	segments    []int         // 小窗口计数器
	mu          sync.Mutex    // 锁
	lastTime    time.Time     // 上次请求时间
}

func NewSlidingWindowLimiter(rate int, windowSize, segmentSize time.Duration) *SlidingWindowLimiter {
	segments := make([]int, int(windowSize/segmentSize))
	return &SlidingWindowLimiter{
		rate:        rate,
		windowSize:  windowSize,
		segmentSize: segmentSize,
		segments:    segments,
		lastTime:    time.Now(),
	}
}

func (l *SlidingWindowLimiter) Allow() bool {
	l.mu.Lock()
	defer l.mu.Unlock()

	now := time.Now()
	elapsed := now.Sub(l.lastTime)
	l.lastTime = now

	// 移动窗口
	steps := int(elapsed / l.segmentSize)
	if steps > len(l.segments) {
		steps = len(l.segments)
	}
	for i := 0; i < steps; i++ {
		l.segments = append(l.segments[1:], 0)
	}

	// 计算当前窗口总请求数
	total := 0
	for _, count := range l.segments {
		total += count
	}

	if total >= l.rate {
		return false
	}

	l.segments[len(l.segments)-1]++
	return true
}

func main() {
	limiter := NewSlidingWindowLimiter(100, time.Minute, 10*time.Second) // 每分钟100个请求,10秒一个小窗口

	for i := 0; i < 150; i++ {
		if limiter.Allow() {
			fmt.Printf("请求 %d 通过\n", i+1)
		} else {
			fmt.Printf("请求 %d 被限流\n", i+1)
		}
		time.Sleep(500 * time.Millisecond)
	}
}

3. 漏桶算法

漏桶算法模拟一个固定容量的桶,请求以任意速率进入桶中,但以固定速率流出。当桶满时,新请求会被拒绝。

​特点​​:

  • 平滑流量输出
  • 无法应对突发流量
  • 实现简单

​Golang 实现​​:

package main

import (
	"fmt"
	"sync"
	"time"
)

type LeakyBucketLimiter struct {
	rate       float64     // 流出速率(请求/秒)
	capacity   float64     // 桶容量
	water      float64     // 当前水量
	lastLeakMs int64       // 上次漏水时间(毫秒)
	mu         sync.Mutex  // 锁
}

func NewLeakyBucketLimiter(rate, capacity float64) *LeakyBucketLimiter {
	return &LeakyBucketLimiter{
		rate:       rate,
		capacity:   capacity,
		lastLeakMs: time.Now().UnixNano() / 1e6,
	}
}

func (l *LeakyBucketLimiter) Allow() bool {
	l.mu.Lock()
	defer l.mu.Unlock()

	nowMs := time.Now().UnixNano() / 1e6
	elapsedMs := nowMs - l.lastLeakMs
	l.lastLeakMs = nowMs

	// 漏水
	l.water -= float64(elapsedMs) * l.rate / 1000
	if l.water < 0 {
		l.water = 0
	}

	// 判断桶是否已满
	if l.water >= l.capacity {
		return false
	}

	l.water++
	return true
}

func main() {
	limiter := NewLeakyBucketLimiter(10, 20) // 每秒10个请求,桶容量20

	for i := 0; i < 50; i++ {
		if limiter.Allow() {
			fmt.Printf("请求 %d 通过\n", i+1)
		} else {
			fmt.Printf("请求 %d 被限流\n", i+1)
		}
		time.Sleep(50 * time.Millisecond)
	}
}

4. 令牌桶算法

令牌桶算法以固定速率向桶中添加令牌,请求需要获取令牌才能被处理。当桶中没有令牌时,请求会被拒绝。

​特点​​:

  • 可以应对突发流量
  • 实现相对复杂
  • 是业界最常用的限流算法

​Golang 实现​​:

package main

import (
	"fmt"
	"sync"
	"time"
)

type TokenBucketLimiter struct {
	rate         float64     // 令牌生成速率(令牌/秒)
	capacity     float64     // 桶容量
	tokens       float64     // 当前令牌数
	lastTokenMs  int64       // 上次生成令牌时间(毫秒)
	mu           sync.Mutex  // 锁
}

func NewTokenBucketLimiter(rate, capacity float64) *TokenBucketLimiter {
	return &TokenBucketLimiter{
		rate:        rate,
		capacity:    capacity,
		lastTokenMs: time.Now().UnixNano() / 1e6,
	}
}

func (l *TokenBucketLimiter) Allow() bool {
	l.mu.Lock()
	defer l.mu.Unlock()

	nowMs := time.Now().UnixNano() / 1e6
	elapsedMs := nowMs - l.lastTokenMs
	l.lastTokenMs = nowMs

	// 生成令牌
	l.tokens += float64(elapsedMs) * l.rate / 1000
	if l.tokens > l.capacity {
		l.tokens = l.capacity
	}

	// 判断是否有令牌
	if l.tokens < 1 {
		return false
	}

	l.tokens--
	return true
}

func main() {
	limiter := NewTokenBucketLimiter(10, 20) // 每秒10个令牌,桶容量20

	for i := 0; i < 50; i++ {
		if limiter.Allow() {
			fmt.Printf("请求 %d 通过\n", i+1)
		} else {
			fmt.Printf("请求 %d 被限流\n", i+1)
		}
		time.Sleep(50 * time.Millisecond)
	}
}

算法对比

算法实现难度平滑度突发流量处理资源消耗适用场景
计数器简单不支持简单限流
滑动窗口中等部分支持需要更精确限流
漏桶中等不支持需要平滑输出
令牌桶复杂支持主流方案,允许突发流量

Golang 标准库实现

Golang 的 golang.org/x/time/rate 包提供了基于令牌桶算法的限流器实现:

package main

import (
	"fmt"
	"golang.org/x/time/rate"
	"time"
)

func main() {
	// 每秒10个令牌,桶容量为10
	limiter := rate.NewLimiter(10, 10)

	for i := 0; i < 20; i++ {
		if limiter.Allow() {
			fmt.Printf("请求 %d 通过\n", i+1)
		} else {
			fmt.Printf("请求 %d 被限流\n", i+1)
		}
		time.Sleep(50 * time.Millisecond)
	}
}

分布式限流

在分布式系统中,单机限流无法满足需求,需要使用分布式限流方案,常见实现方式:

  1. ​Redis + Lua 脚本​​:利用 Redis 的原子性和高性能
  2. ​分布式令牌桶​​:如使用 Redis 存储令牌数量
  3. ​中间件​​:如 Nginx、Envoy 提供的限流功能

​Redis + Lua 示例​​:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
	"time"
)

var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{
	Addr: "localhost:6379",
})

func isAllowed(key string, limit int, window time.Duration) bool {
	// 使用Lua脚本保证原子性
	script := `
	local key = KEYS[1]
	local limit = tonumber(ARGV[1])
	local window = tonumber(ARGV[2])
	local current = redis.call('GET', key)
	if current == false then
		redis.call('SET', key, 1, 'EX', window)
		return 1
	end
	if tonumber(current) >= limit then
		return 0
	else
		redis.call('INCR', key)
		return 1
	end
	`
	result, err := rdb.Eval(ctx, script, []string{key}, limit, window.Seconds()).Result()
	if err != nil {
		fmt.Println("Redis error:", err)
		return false
	}
	return result.(int64) == 1
}

func main() {
	key := "user_123" // 用户ID或其他标识
	limit := 100      // 限制数量
	window := time.Minute // 时间窗口

	for i := 0; i < 150; i++ {
		if isAllowed(key, limit, window) {
			fmt.Printf("请求 %d 通过\n", i+1)
		} else {
			fmt.Printf("请求 %d 被限流\n", i+1)
		}
		time.Sleep(500 * time.Millisecond)
	}
}

最佳实践

  1. ​选择合适的算法​​:根据业务需求选择最适合的限流算法
  2. ​多级限流​​:可以在网关、服务、方法等多个层次实施限流
  3. ​动态调整​​:根据系统负载动态调整限流阈值
  4. ​区分用户​​:对重要用户和普通用户实施不同的限流策略
  5. ​监控与告警​​:监控限流情况,及时发现异常
  6. ​优雅降级​​:被限流时提供友好的响应,如排队页面或缓存内容

滚动至顶部