Go语言的跨协程异常处理

2020-06-07 ⏳3.4分钟(1.3千字) 🕸️

Go语言内置协程,这极大降低了并发编程的门槛。但这并不意味着并发编程的难度降低了。多协程的报错协同处理就是难点之一。为此官方提供了 errgroup 包。但这个包无法处理协程 panic 的问题。在 Go 语言中,我们无法在父协程里捕获子协程的异常(panic)。如果服务有异常没有处理,整个进程都会退出。这会产生很多不可预知的问题。今天就给大家分享一种跨协程的的异常处理方案。

先给出一个异常示例:

func main() {
  // 希望捕获所有所有 panic
  defer func() {
    r := recover()
    fmt.Println(r)
  }()

  // 启动新协程
  go func() {
    panic(123)
  }()
  // 等待一下,不然协程可能来不及执行
  time.Sleep(1 * time.Second)
  fmt.Println("这条消息打印不出来")
}

自己执行一下就会发现,最后的fmt.Println没有执行,也就是说开始的recover()没有捕获协程中的panic,整个进程都退出了。

一般的Go框架都是针对每一个请求启一个新协程,并统一捕获所有的panic。

如果程序员在写业务代码的时候开了新协程而且忘记了在协程中捕获panic的话,服务的进程就会因为某个未捕获的panic而退出。所以,我在Sniper一个轻量级Go业务框架的思考一文中建议少用或者不用协程

少用或不用协程对于大多数业务场景是没有问题。但有些少数场景还是要用协程的。

比如说我司有一个批量查询接口,一次最多支持查50条信息。但我们的业务要求一次要查100到1000条信息不等。如果不使用协程,需要依次执行2到10次查询,可能就超时了。

不用协程会超时,用协程可能会panic,怎么办呢?最好的办法就是引入框架方法,把Go的协程包装一下。

我们首先定义一个结构体

// Panic 子协程 panic 会被重新包装,添加调用栈信息
type Panic struct {
  R     interface{} // recover() 返回值
  Stack []byte      // 当时的调用栈
}

func (p Panic) String() string {
  return fmt.Sprintf("%v\n%s", p.R, p.Stack)
}

type PanicGroup struct {
  panics chan Panic // 协程 panic 通知信道
  dones  chan int   // 协程完成通知信道
  jobN   int32      // 协程并发数量
}

因为Go的channel无法直接初始化,所以需要定义一个工厂方法

func NewPanicGroup() *PanicGroup {
  return &PanicGroup{
    panics: make(chan Panic, 8),
    dones:  make(chan int, 8),
  }
}

针对PanicGroup我们定义GoWait两个方法。

Go方法用来启动新的协程,定义如下:

func (g *PanicGroup) Go(f func()) *PanicGroup {
  atomic.AddInt32(&g.jobN, 1)
  go func() {
    defer func() {
      if r := recover(); r != nil {
        g.panics <- Panic{R: r, Stack: debug.Stack()}
        return
      }
      g.dones <- 1
    }()
    f()
  }()

  return g // 方便链式调用
}

这里的核心思想就是启动一个带有recover()的协程,捕获f()执行过程产生的panic并写到g.dones。每个协程启动的时候将计数器加一,方便Wait方法等待。

Wait方法用来等待所有协程结束或者panic,定义如下

func (g *PanicGroup) Wait(ctx context.Context) error {
  for {
    select {
    case <-g.dones:
      if atomic.AddInt32(&g.jobN, -1) == 0 {
        return nil
      }
    case p := <-g.panics:
      panic(p)
    case <-ctx.Done():
      return ctx.Err()
    }
  }
}

如果所有协程都结束,g.jobN就会等于零,Wait就会返回。如果有协程发生panic,Wait就会通过p := <-g.panics捕获并重新拋出。这个时候,框架就能处理这个panic了。

使用起来也是非常的方便

g := NewPanicGroup()
g.Go(f1)
g.Go(f2)
g.Wait(ctx)
// 也可以链式调用
err := NewPanicGroup().Go(f1).Go(f2).Wait(ctx)

PanicGroup的本质就是各个子协程各自捕获自己的panic,并通过channel传给父协程(传递的时候带上调用栈信息)。父协程拿到panic信息后重新panic,看起来就像是在父协程拋出来一样。如此就绕过了Go不能在父协程捕获子协程panic的问题,也就避免了部分业务逻辑panic导致整个服务进程退出的问题

另外一个需要主意的设计就是Go方法的入参。有人喜欢设计成func(context.Context) error,我想了一下,觉得这种设计不好。

f是由PanicGroup执行的,如果需要传入一个ctx,那势必需要在PanicGroup中保存当前ctx,就是不被推荐的。当然,有人会说你可以在调Go方法的时候先把f存下来,等到调Wait方法的时候再统一起协程。这样做确实可以不用保存ctx了,却要保存所有的f,我也是不推荐这种做法。

f返回了一个error,这看起来是通用设计,返回报错总是天经地义的。可是,f是由PanicGroup 执行的,如果要返回错误,PanicGroup就需要跟踪哪些协程有报错、哪些没有报错,不同的报错如何跟协程对应。这会让PanicGroup的设计变得非常复杂。

基于以上两点,我建议将f设计成func(),完全没有参数,PanicGroup只跟踪panic就好了,简单明了。那如何给协程传参并处理返回结果呢?使用闭包!

这是一个实际的传参示例:

func MCount(ctx context.Context, ids []int64) map[int64]int32 {
  m := sync.Mutex{}
  g := xsync.NewPanicGroup()

  chunkSize := 50
  total := make(map[int64]int32)
  for i := 0; i < len(ids); i += chunkSize {
    begin := i
    end := i + chunkSize
    if end > len(ids) {
      end = len(ids)
    }

    g.Go(func() {
      ts := mCount(ctx, ids[begin:end])

      m.Lock()
      defer m.Unlock()
      for k, v := range ts {
        total[k] = v
      }
    })
  }

  err = g.Wait(ctx)
  return total
}

今天(2022-05-09)收到读者来信,问有没有完整的代码。我自认为文章已经把完整的思路讲清楚了。大家在理解的基础上可以构建符合自己要求的代码。因为异常处理是非常底层的逻辑,我建议大家还是要自己写。不过在此也分享一下我们内部使用的完整代码,算是给大家提供一个参考。

以上就是文章的全部内容,欢迎大家留言讨论😄