最近新开发了个排行榜系统,压力测试时高并发拉取榜单时,会导致整个服务不可用。用pprof查看情况,发现多个goroutine长时间阻塞,与其相关的代码如下:
// Worker 定义如何执行 ListenMsg 时设置的回调函数
type Worker interface {
// Post 执行工作任务
Post(func())
// WaitDone 等待当前所有任务执行完毕
WaitDone()
GoroutineCount() int32
}
type parallelWorker struct {
wg sync.WaitGroup
ch chan struct{}
recover bool
goroutineCount int32
}
func (w *parallelWorker) GoroutineCount() int32 {
return atomic.LoadInt32(&w.goroutineCount)
}
func (w *parallelWorker) Post(f func()) {
w.wg.Add(1)
atomic.AddInt32(&w.goroutineCount, 1)
if w.ch == nil {
go func() {
defer func() {
atomic.AddInt32(&w.goroutineCount, -1)
w.wg.Done()
if w.recover {
if err := recover(); err != nil {
stack := debug.Stack()
logrus.WithFields(logrus.Fields{
"err": err,
"stack": string(stack),
}).Error("Recover")
os.Stderr.Write([]byte(fmt.Sprintf("%v\n", err)))
os.Stderr.Write(stack)
}
}
}()
f()
}()
} else { // goroutine数量限制,通过channel控制
// 从信道中取一个struct{}。如果信道为空,说明无可用goroutine,都已被占用,此时会阻塞,直到有goroutine被释放
<-w.ch
go func() {
defer func() {
atomic.AddInt32(&w.goroutineCount, -1)
w.wg.Done()
w.ch <- struct{}{} // 归还一个struct{} 即解除对当前持有的goroutine的占用
if w.recover {
if err := recover(); err != nil {
stack := debug.Stack()
logrus.WithFields(logrus.Fields{
"err": err,
"stack": string(stack),
}).Error("Recover")
os.Stderr.Write([]byte(fmt.Sprintf("%v\n", err)))
os.Stderr.Write(stack)
}
}
}()
f()
}()
}
}
func (w *parallelWorker) WaitDone() {
w.wg.Wait()
}
// NewParallelWorker 创建并发的执行器.
// maxGoroutines 为最大 goroutine 并发数量, maxGoroutines <= 0 时, 表示不限.
// recover 表示是否需要启用 recover 来保护 panic.
func NewParallelWorker(maxGoroutines int, recover bool) Worker {
w := ¶llelWorker{recover: recover}
if maxGoroutines > 0 {
w.ch = make(chan struct{}, maxGoroutines)
for i := 0; i < maxGoroutines; i++ {
// 向缓冲信道中写入 maxGoroutines 个信号,代表初始时有maxGoroutines 个 goroutine可用
w.ch <- struct{}{}
}
}
return w
}
ParallelWorker 代码分析与死锁情况分析
代码详解
这段代码实现了一个并行任务执行器,主要功能是控制并发执行的goroutine数量,并提供任务提交和等待完成的功能。
核心结构
Worker
接口定义了三个方法:Post
(提交任务)、WaitDone
(等待所有任务完成)和GoroutineCount
(获取当前goroutine数量)parallelWorker
实现了Worker
接口,包含:
•wg
:sync.WaitGroup
用于等待所有任务完成
•ch
: 带缓冲的channel,用于控制最大并发goroutine数量
•recover
: 是否启用panic恢复
•goroutineCount
: 当前活跃的goroutine数量(原子计数器)
关键方法
Post(func())
:
• 当ch
为nil时(无限制模式),直接启动goroutine执行任务
• 当ch
非nil时(限制模式),先从channel获取令牌,再启动goroutine
• 两种模式都包含panic恢复逻辑(如果启用)WaitDone()
:
• 使用wg.Wait()
等待所有任务完成NewParallelWorker(maxGoroutines int, recover bool)
:
• 创建并初始化parallelWorker
• 当maxGoroutines>0
时,初始化带缓冲的channel并填充令牌
死锁风险分析(当maxGoroutines=5时)
死锁场景
当业务调用同步阻塞的RPC时,可能出现以下死锁情况:
- 所有goroutine被占用:
• 5个goroutine都在执行同步阻塞的RPC调用
• 第6个任务调用Post()
时,会阻塞在<-w.ch
等待可用goroutine - 任务间依赖:
• 如果某个任务的完成依赖于另一个被阻塞的任务(如等待其RPC结果)
• 但该任务因没有可用goroutine而无法执行
• 形成循环等待 - RPC调用内部使用同一Worker:
• 如果RPC处理过程中又使用同一个Worker提交新任务
• 可能导致所有goroutine都在等待新任务执行,而新任务又需要goroutine
具体死锁条件
- 所有5个goroutine都在执行同步阻塞操作
- 这些阻塞操作需要等待其他任务完成
- 但其他任务因没有可用goroutine而无法执行
Bug复现代码
worker := NewParallelWorker(5, true)
// 提交5个阻塞任务
for i := 0; i < 5; i++ {
worker.Post(func() {
// 同步阻塞RPC调用
result := makeSomeRPCCall()
// 在RPC回调中又提交新任务
worker.Post(func() {
fmt.Println("This will deadlock!")
})
})
}
解决方案
- 避免在受限Worker中执行同步阻塞操作:
• 将同步RPC改为异步方式
• 使用回调或channel处理结果 - 使用更大的goroutine限制:
• 根据业务需求调整maxGoroutines值 - 分层Worker:
• 为RPC调用使用单独的Worker实例
• 避免任务间的相互阻塞 - 超时机制:
• 为阻塞操作添加超时控制
• 防止无限期等待 - 任务队列分离:
• 将可能阻塞的任务与其他任务分开处理
• 使用不同的Worker实例