Go语言的并发编程

2022-11-09 ⏳11.3分钟(4.5千字) 🕸️

本文介绍Go语言并发编程的概念、用法和需要注意的问题。

并发的作用

我们之前说过,Go语言是一种面向过程的语言。默认程序会很main函数开始,逐行执行代码,而且只能使用CPU的一个核。举个例子:

files := []string{"a.txt", "b.txt", "c.txt"}
for _, f := range files {
  Compress(f)
}

假设Compress会根据传入的路径来压缩文件。上面的循环会自动压缩files中保存的所有文件,但是一次只能压缩一个。后面的文件必须等前面的压完了才能开始。考虑到压缩操作比较耗时,而且当今的CPU都是多核,所以如果能使用多核并发压缩,一定会降低整个过程的耗时。在Go语言中可以使用协程实现这种效果。

files := []string{"a.txt", "b.txt", "c.txt"}
for _, f := range files {
  go Compress(f) // 启动协程
}

Go语言起协程非常简单,只需要在调用函数的前面加上go关键字。上面的代码会在循环中启动三个协程,每个协程独自运行Compress函数,互不影响。一般来说不同的协程会使用不同的CPU核,所以整个压缩过程可以并发执行。

并发的问题

以上是一个虚拟的示例,没法直接运行。下面我用一个更简单的例子来说明使用协程需要注意的问题。

package main

func main() {
  for i := 0; i < 10; i++ {
    go func() {
      fmt.Println("hello")
    }()
  }
}

这里的func(){}()是一种函数的惯用法,意思是声明一个匿名函数并且立即执行。再加上前面的go关键字,表示声明匿名函数并且在新协程中立即执行。上面的例子是批量创建十个协程,并在每个协程里输出hello字符串。

如果你自己运行一把就会发现,程序什么内容也没输出,直接退出了😂造成这个问题的根本原因是main函数退出了,整个程序结束,之前创建的协程没有机会执行,也就不可能输出什么内容了。

要想解决这个问题,main函数在创建协程后,需要等待所有协程都结束后才能退出。不能自己先退出。这种协程之间的等待又称为协程同步。Go语言为协程同步提供了特殊的数据类型,叫通道(channel)。

通道(channel)

通道跟切片有点类似,我们可以把它想象成是一消息队列,我们需要给消息指定类型,我们可以把消息放到队列,也可以从队列里获取消息。通道的声明语法如下:

var ch1 chan int // 队列中没能放 int 数据
ch1 = make(chan int)
ch2 := make(chan string, 4)

声明通道使用chan关键字,后面跟消息的类型。通道跟字典类似,声明之后还需要初始化,不然无法使用。所以ch1还需要配合make(chan int)来完成初始化。

我们在使用make创建通道的时候还可以额外指定第二个参数,用来表示队列的缓冲区长度。如果不指定,默认长度就是零。缓冲区的用途我们后面说。

有了通道变量之后,我们就可以往里面投递数据或者读取数据:

ch1 <- 1    // 写入
v := <- ch1 // 读取

这里用到了箭头操作符<-,箭头表示数据的流向。ch1 <- 1表示数据流向通道,<- ch1表示数据流出通道。v := <- ch1表示从ch1提出一条数据并保存到变量v中。

如果你把上面两行代码写到main函数并运行,程序会直接报错:

fatal error: all goroutines are asleep - deadlock!

解决办法也很简单,把ch1改为make(chan int, 1)就可以了,给它设一个缓冲区。

对于没有缓冲区的通道,如果某协程想投递数据,则该协程会暂停执行。一直等到有另一个协程想从该通道读取消息的时候才能恢复。这种暂停也叫挂起。这种挂起是双向的。如果某协程尝试从ch1读取消息,但此时又没有其他协程尝试写入,则该协程也会被挂起。通道在这里是两个协程的纽带,两个协程必须同时读写才行。

回到上面的例子,我们先尝试给ch1写入消息,这个时候运行时就会挂起main函数所在的协程。因为后续的读取操作也在main函数中,所以不可能有协程从ch1读取内容了,这样main函数就会一直处理挂起状态。这就是所谓的死锁,自己跟自己死锁了😂

一般发生死锁后程序不会退出。因为运行时很难判断到底有没有产生死锁。但我们的示例中只有一个协程,也就是运行main函数的协程,它都挂起了说明一定产生了死锁,所以就直接报错了。但在实际生产系统中,死锁问题很难发现。所以大家在写并发代码的时候一定要小心。

如果把ch1改成make(chan int, 1),它就有了缓冲区。这个时候ch1 <- 1就不会挂起当前协程,然后面的v := <- ch1也就能正常执行了。

协程同步

有了通道这个工具,我们就可以解决前面说的问题。改造代码如下:

package main

func main() {
  ch := make(chan int, 10)
  for i := 0; i < 10; i++ {
    go func() {
      fmt.Println("hello")
      ch <- 1
    }()
  }
  for i := 0; i < 10; i++ {
    <- ch
  }
}

我们先是定义了缓冲长度为十的int型通道。每个协程完成后会往ch写入消息1。因为有缓冲,所以不论有没有其他协程读取ch,刚的写入肯定会成功,对应的协程执行完成后会退出。

我们在main函数的最后尝试从ch读取消息,一共读十次。每个协程结束都会写入一条消息。如果协程没有完全,读取操作会挂起main函数所在的协程。

运行改造后的代码就会看到程序输出十行hello。上例中也可以使用无缓冲通道。无缓冲通道需要读写双方同时操作,所以main函数每读取一条消息才会有一个协程退出。而有缓冲的版本则没有此限制,所以打开协程完成工作后会直接退出,不受main函数所在协程的影响。但无论如何,如果没有同步,并发协程的执行顺序是不确定的。

并发顺序

我们稍微修改一下上述代码:

package main

func main() {
  ch := make(chan int, 10)
  for i := 0; i < 10; i++ {
    go func(id int) {
      fmt.Println("hello", id)
      ch <- 1
    }(i)
  }
  for i := 0; i < 10; i++ {
    <- ch
  }
}

我们给每个协程加一个序号,通过匿名函数的参数传进去,然后在运行的时候打开出来。多执行几次,你会发现每次的执行顺序都不一样,这就印证了刚才的判断。所以,协程在并发运行的时候顺序不可控!

range 语法

在实践中经常会循环读取通道,所以Go语言也支持使用range关键字实现「遍历」通道的效果。刚才main函数最后的for循环可以改写为:

for v := range ch {
  fmt.Println(v) //随便做点事
}

使用range的时候不需要指定箭头操作符!它实际的运行过程如下:

for {
  v := <-ch
  fmt.Println(v)
}

所以这个 for 循环不会自动结束,除非有其他协程使用close函数关闭ch通道。

最实战的例子是定时间器。如果我们想每隔一段时间输出一条消息,可以使用time标准库提供的通道:

ch := time.Tick(3 * time.Second)
go func() {
  for t := range ch {
    fmt.Println(t)
  }
}()

time.Tick(3*time.Second)会返回一个chan time.Time通道,每隔三秒就可以从里面读取一个时间对象。上面的代码创建了一个单独协程,不停从定时器通道读取时间。只要main函数不退出,该协程就会不停打印定时器触发的时间。

但问题来了,上面的循环什么时候结束呢?答案是永远不会结束。如果通道里没有内容了,协程在读取的时候会被挂起。但有时候我们希望告知对应的协程已经没有消息了,工作干完了,可以退出了。

通知退出有几种方法。最简单的就是关闭对应的通道:

ch := make(chan int)
go func() {
  time.Sleep(1*time.Second)
  close(ch)
}()
for v := range ch {
  fmt.Println(v)
}

通道关闭之后for循环会收到信号,然后结束循环。但这种办法有个副作用:

v := <-ch

如果是自己用箭头操作符读取消息,当通道关闭后代码还是会收到一个值,只不过这次是零值。本例中v最后的取值是0。为了区分正常的消息零值和关闭零值,Go语言还支持另外一种读取语法:

v, ok := <-ch

通道关闭的时候上述读取也会返回,而且ok会被设置为false。程序可以通过检查ok变量来确定通道是否已经关闭。

select 语法

除了关闭通道之外,我们还可以使用一个单独的控制通道来通知协程退出。比如:

ch1 := make(chan int)
ch2 := make(chan int)

go func() {
  for {
    v := <-ch1 
    fmt.Println(v)
    if v := <-ch2; v > 0 {
      return
    }
  }
}()

这段程序在协程里先从ch1读取数据,然后打印出来。它在继续循环之前会尝试从ch2读取消息。如果有消息则表明活干完了,可以退出。这段程序有一个问题,如果ch1里面迟迟没有消息,那么协程就会卡在v := <-ch1这一行。此时外界通过ch2发信号没法关闭协程,因为协程被挂起了。即使有协程给ch1发消息,程序会输出对应的值。但如果外界没有给ch2发消息,那么v := <-ch2也会卡住。所以说上述代码根本无法实现我们的设计意图。

这个问题的核心是在同一个协程中从通道读取消息必须按顺序执行。而我们希望能同时从多个通道等待消息。无论是ch1还是ch2,只要有消息就唤醒协程处理。这个功能需要select关键字:

go func() {
  for {
    select {
    case v := <-ch1:
      fmt.Println(v)
    case <-ch2:
      return
    }
  }
}()

因为用上了select,只要ch1ch2任何通道有新消息,处理协程都会被唤醒。select还支持default分支,对应的是所有case分支都没有新消息的情景,业务代码使用较少,初学者记住有这么回事就行。后面边用边学。

除了用通道来同步协程外,Go官方还封装了WaitGroup对象,之前的示例可以改写为:

import "sync"

func main() {
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
      fmt.Println("hello")
    }()
  }
  wg.Wait()
}

声明wg之后,每创建一个协程就调用wg.Add(1),表示要多等待一个协程。main函数最终调用wg.Wait()等待所有协程结束。每一个协程结束后需要调用wg.Done()。所有协程都结构后wg.Wait()函数就会返回,整个程序才能退出。

新代码中使用defer wg.Done()来调用函数,它能确保每个协程结束时一定能执行wg.Done()函数。如果不用defer而且协程在执行的过程产生了panic,那就可能没法执行对应的wg.Done()函数,所以main函数会一直等待,从而产生死锁。

到这里我们就基本学完协程同步相关的知识了。接下来我们学习另一个重要主题,协程争用。

协程争用

所谓争用,就是多个协程争相读写同一个变量。前文所讲的通道就是最典型的例子,不同的协程可能会同时读写通道。

因为有争用,所以也就有了并发安全的概念。如果某类型的变量支持多协程同时读写,我们就称之为并发安全。通道类型就是并发安全的。但是除非特殊说明,几乎所有类型都不是并发安全的。如果多协程同时读写某变量,轻则会产生 panic 进而程序退出(是的,这是简单情况);重则会破坏数据,但程序以一种错误的方式持续运行,等发现的时候已经很难收场。

先举一个例子,让大家感觉一把:

func main() {
  a := 0
  g := sync.WaitGroup{}
  for i := 0; i < 10000; i++ {
    g.Add(1)
    go func() {
      defer g.Done()
      a++
    }()
  }
  g.Wait()
  fmt.Println(a)
}

程序声明了一个变量a,然后启动一万个协程对该变量做加一操作。运行几次就会发现,结果并非总是一万。大家自己运行体验一下。

出现这种现象的根本原因是所有协程同时操作一个变量a。因为协程的执行顺序和时间并不固定,当某个协程把a0改为1的时候,有可能有另外一个协程没有拿到a的最新值1,依然是在0的基础上加一再写回a对应的内存,这样就可能产生错误的结果。

我之前还遇到并发读写 map 导致程序异常退出的案例,具体可以阅读这篇文章

并发锁

最简单的办法是加锁。协程在更新a之前先获取一把锁,这个时候其他协程因为不可能同时获取锁,所以只能等当前协程更新之后再更新。当前协程更新完成后需要主动释放锁,这样其他更新协程才能继续尝试获取锁并更新a的值。

改良后的代码如下:

func main() {
  a := 0
  m := sync.Mutex{}
  g := sync.WaitGroup{}
  for i := 0; i < 10000; i++ {
    g.Add(1)
    go func() {
      defer g.Done()
      m.Lock()
      defer m.Unlock()
      a++
    }()
  }
  g.Wait()
  fmt.Println(a)
}

先声明m := sync.Mutex{}。然后各协程在更新之前先尝试获取锁m.Lock()。一次只能有一个协程锁定成功,其他协程都会被挂起。等成功的协程更新完成后会通过defer m.Unlock()释放锁定。这时候被挂起的协程又会被唤醒,开始新一轮的争抢过程。

加锁的本质是排队,所有的协程按照获取锁的顺序依次更新a的值,这样就不会产生并发总题。但是,加锁是有代价的。没有抢到锁的协程会被挂起,而且协程多了相互争锁也会给操作系统带来一定的负担。所以说还是要少写可能产生争用的代码。

减少急用最简单的办法是不要共享内存(变量)。不共享就不会有争用,大家各干各的,互不影响。所以我们要尽量避免使用共享变量、指针、切片和字节,这些变量都是按引用传递,不同协程可能会操作同一段内存,从而产生争用问题。Go语言本身也鼓励传值,鼓励拷贝内存,虽然会有一些性能上的损耗,但跟并发引起的BUG相比,这种损耗不值一提。

并发安全类型

再一个办法就是使用并发安全的数据类型。比如字典可以使用sync.Map代替内置的map。但是无脑使用并发版本类型也可能会导致性能问题,因为多数情况下并不会碰到多协程争用问题。

m := sync.Map{}
for i := 0; i < 10; i++ {
  go func() {
    // 无需加锁
    m.Store(i, fmt.Sprint(i)) // 1 => "1"
  }()
}
v := m.Load(1) // 返回 interface{}
v.(string) // 转回 "1"

因为不是内置类型,所以sync.Map不支持range关键字,只能通过Range()来遍历。

原子类型

有一类并发安全的类型叫原子类型,它们由底层硬件实现并发安全,几乎没有性能损耗。大家可以优先考虑使用。相关的类型都封装在sync/atomic这个包。

前面的例子可以改写为:

import "sync/atomic"
func main() {
  a := &atomic.Int32{}
  g := sync.WaitGroup{}
  for i := 0; i < 10000; i++ {
    g.Add(1)
    go func() {
      defer g.Done()
      a.Add(1)
    }()
  }
  g.Wait()
  fmt.Println(a.Load())
}

读写锁

为了降低加锁成本,人们发明了读写锁。简单来说多数变量都是读多写少。多个协程同时读一个变量不会有问题,只要读的过程中没有人修改这个变量就行了。

首先创建读写锁:

rwm := sync.RWMutex{}

读协程需要获取读锁,写协程需要获取写锁:

rwm.RLock() // 获取读锁
rwm.Lock()  // 获取写锁

因为读不会修改数据,所以允许多个协程同时获取读锁,也就并发读取变量内容。此时如果有少量协程想修改内容,它们需要获取写锁。写锁是排他锁,需要等所有读锁和其他写锁释放才能获取。一旦写协程成功获取写锁,在它解锁之前,所有其他尝试获取读锁或写锁的协程都会被挂起。读锁也叫共享锁,写锁也叫排他锁。

总结

到这就讲完了并发相关的主要内容。不论是哪一种语言,并发编程都是非常困难的领域。Go语言内置协程只是降低了并发编程的门槛,但绝对没有降低并发编程的难度。初学者在学习和使用协程的时候一定要慎之又慎,不然很容易出问题。