[BUG]死锁1

最近新开发了个排行榜系统,压力测试时高并发拉取榜单时,会导致整个服务不可用。用pprof查看情况,发现多个goroutine长时间阻塞,与其相关的代码如下:


// Worker 定义如何执行 ListenMsg 时设置的回调函数
type Worker interface {
	// Post 执行工作任务
	Post(func())

	// WaitDone 等待当前所有任务执行完毕
	WaitDone()

	GoroutineCount() int32
}

type parallelWorker struct {
	wg             sync.WaitGroup
	ch             chan struct{}
	recover        bool
	goroutineCount int32
}

func (w *parallelWorker) GoroutineCount() int32 {
	return atomic.LoadInt32(&w.goroutineCount)
}

func (w *parallelWorker) Post(f func()) {
	w.wg.Add(1)
	atomic.AddInt32(&w.goroutineCount, 1)
	if w.ch == nil {
		go func() {
			defer func() {
				atomic.AddInt32(&w.goroutineCount, -1)
				w.wg.Done()
				if w.recover {
					if err := recover(); err != nil {
						stack := debug.Stack()
						logrus.WithFields(logrus.Fields{
							"err":   err,
							"stack": string(stack),
						}).Error("Recover")

						os.Stderr.Write([]byte(fmt.Sprintf("%v\n", err)))
						os.Stderr.Write(stack)
					}
				}
			}()
			f()
		}()
	} else { // goroutine数量限制,通过channel控制
		// 从信道中取一个struct{}。如果信道为空,说明无可用goroutine,都已被占用,此时会阻塞,直到有goroutine被释放
		<-w.ch
		go func() {
			defer func() {
				atomic.AddInt32(&w.goroutineCount, -1)
				w.wg.Done()
				w.ch <- struct{}{} // 归还一个struct{} 即解除对当前持有的goroutine的占用
				if w.recover {
					if err := recover(); err != nil {
						stack := debug.Stack()
						logrus.WithFields(logrus.Fields{
							"err":   err,
							"stack": string(stack),
						}).Error("Recover")

						os.Stderr.Write([]byte(fmt.Sprintf("%v\n", err)))
						os.Stderr.Write(stack)
					}
				}
			}()
			f()
		}()
	}
}
func (w *parallelWorker) WaitDone() {
	w.wg.Wait()
}

// NewParallelWorker 创建并发的执行器.
// maxGoroutines 为最大 goroutine 并发数量, maxGoroutines <= 0 时, 表示不限.
// recover 表示是否需要启用 recover 来保护 panic.
func NewParallelWorker(maxGoroutines int, recover bool) Worker {
	w := &parallelWorker{recover: recover}
	if maxGoroutines > 0 {
		w.ch = make(chan struct{}, maxGoroutines)
		for i := 0; i < maxGoroutines; i++ {
			// 向缓冲信道中写入 maxGoroutines 个信号,代表初始时有maxGoroutines 个 goroutine可用
			w.ch <- struct{}{}
		}
	}
	return w
}

ParallelWorker 代码分析与死锁情况分析

代码详解

这段代码实现了一个并行任务执行器,主要功能是控制并发执行的goroutine数量,并提供任务提交和等待完成的功能。

核心结构

  • Worker接口定义了三个方法:Post(提交任务)、WaitDone(等待所有任务完成)和GoroutineCount(获取当前goroutine数量)
  • parallelWorker实现了Worker接口,包含:
    wg: sync.WaitGroup用于等待所有任务完成
    ch: 带缓冲的channel,用于控制最大并发goroutine数量
    recover: 是否启用panic恢复
    goroutineCount: 当前活跃的goroutine数量(原子计数器)

关键方法

  1. Post(func()):
    • 当ch为nil时(无限制模式),直接启动goroutine执行任务
    • 当ch非nil时(限制模式),先从channel获取令牌,再启动goroutine
    • 两种模式都包含panic恢复逻辑(如果启用)
  2. WaitDone():
    • 使用wg.Wait()等待所有任务完成
  3. NewParallelWorker(maxGoroutines int, recover bool):
    • 创建并初始化parallelWorker
    • 当maxGoroutines>0时,初始化带缓冲的channel并填充令牌

死锁风险分析(当maxGoroutines=5时)

死锁场景

当业务调用同步阻塞的RPC时,可能出现以下死锁情况:

  1. 所有goroutine被占用
    • 5个goroutine都在执行同步阻塞的RPC调用
    • 第6个任务调用Post()时,会阻塞在<-w.ch等待可用goroutine
  2. 任务间依赖
    • 如果某个任务的完成依赖于另一个被阻塞的任务(如等待其RPC结果)
    • 但该任务因没有可用goroutine而无法执行
    • 形成循环等待
  3. RPC调用内部使用同一Worker
    • 如果RPC处理过程中又使用同一个Worker提交新任务
    • 可能导致所有goroutine都在等待新任务执行,而新任务又需要goroutine

具体死锁条件

  • 所有5个goroutine都在执行同步阻塞操作
  • 这些阻塞操作需要等待其他任务完成
  • 但其他任务因没有可用goroutine而无法执行

Bug复现代码

worker := NewParallelWorker(5, true)

// 提交5个阻塞任务
for i := 0; i < 5; i++ {
    worker.Post(func() {
        // 同步阻塞RPC调用
        result := makeSomeRPCCall() 

        // 在RPC回调中又提交新任务
        worker.Post(func() {
            fmt.Println("This will deadlock!")
        })
    })
}

解决方案

  1. 避免在受限Worker中执行同步阻塞操作
    • 将同步RPC改为异步方式
    • 使用回调或channel处理结果
  2. 使用更大的goroutine限制
    • 根据业务需求调整maxGoroutines值
  3. 分层Worker
    • 为RPC调用使用单独的Worker实例
    • 避免任务间的相互阻塞
  4. 超时机制
    • 为阻塞操作添加超时控制
    • 防止无限期等待
  5. 任务队列分离
    • 将可能阻塞的任务与其他任务分开处理
    • 使用不同的Worker实例
滚动至顶部