Go语言的并发编程
涛叔本文介绍Go语言并发编程的概念、用法和需要注意的问题。
并发的作用
我们之前说过,Go语言是一种面向过程的语言。默认程序会很main
函数开始,逐行执行代码,而且只能使用CPU的一个核。举个例子:
:= []string{"a.txt", "b.txt", "c.txt"}
files for _, f := range files {
(f)
Compress}
假设Compress
会根据传入的路径来压缩文件。上面的循环会自动压缩files
中保存的所有文件,但是一次只能压缩一个。后面的文件必须等前面的压完了才能开始。考虑到压缩操作比较耗时,而且当今的CPU都是多核,所以如果能使用多核并发压缩,一定会降低整个过程的耗时。在Go语言中可以使用协程实现这种效果。
:= []string{"a.txt", "b.txt", "c.txt"}
files for _, f := range files {
go Compress(f) // 启动协程
}
Go语言起协程非常简单,只需要在调用函数的前面加上go
关键字。上面的代码会在循环中启动三个协程,每个协程独自运行Compress
函数,互不影响。一般来说不同的协程会使用不同的CPU核,所以整个压缩过程可以并发执行。
并发的问题
以上是一个虚拟的示例,没法直接运行。下面我用一个更简单的例子来说明使用协程需要注意的问题。
package main
func main() {
for i := 0; i < 10; i++ {
go func() {
.Println("hello")
fmt}()
}
}
这里的func(){}()
是一种函数的惯用法,意思是声明一个匿名函数并且立即执行。再加上前面的go
关键字,表示声明匿名函数并且在新协程中立即执行。上面的例子是批量创建十个协程,并在每个协程里输出hello
字符串。
如果你自己运行一把就会发现,程序什么内容也没输出,直接退出了😂造成这个问题的根本原因是main
函数退出了,整个程序结束,之前创建的协程没有机会执行,也就不可能输出什么内容了。
要想解决这个问题,main
函数在创建协程后,需要等待所有协程都结束后才能退出。不能自己先退出。这种协程之间的等待又称为协程同步。Go语言为协程同步提供了特殊的数据类型,叫通道(channel)。
通道(channel)
通道跟切片有点类似,我们可以把它想象成是一消息队列,我们需要给消息指定类型,我们可以把消息放到队列,也可以从队列里获取消息。通道的声明语法如下:
var ch1 chan int // 队列中没能放 int 数据
= make(chan int)
ch1 := make(chan string, 4) ch2
声明通道使用chan
关键字,后面跟消息的类型。通道跟字典类似,声明之后还需要初始化,不然无法使用。所以ch1
还需要配合make(chan int)
来完成初始化。
我们在使用make
创建通道的时候还可以额外指定第二个参数,用来表示队列的缓冲区长度。如果不指定,默认长度就是零。缓冲区的用途我们后面说。
有了通道变量之后,我们就可以往里面投递数据或者读取数据:
<- 1 // 写入
ch1 := <- ch1 // 读取 v
这里用到了箭头操作符<-
,箭头表示数据的流向。ch1 <- 1
表示数据流向通道,<- ch1
表示数据流出通道。v := <- ch1
表示从ch1
提出一条数据并保存到变量v
中。
如果你把上面两行代码写到main
函数并运行,程序会直接报错:
fatal error: all goroutines are asleep - deadlock!
解决办法也很简单,把ch1
改为make(chan int, 1)
就可以了,给它设一个缓冲区。
对于没有缓冲区的通道,如果某协程想投递数据,则该协程会暂停执行。一直等到有另一个协程想从该通道读取消息的时候才能恢复。这种暂停也叫挂起。这种挂起是双向的。如果某协程尝试从ch1
读取消息,但此时又没有其他协程尝试写入,则该协程也会被挂起。通道在这里是两个协程的纽带,两个协程必须同时读写才行。
回到上面的例子,我们先尝试给ch1
写入消息,这个时候运行时就会挂起main
函数所在的协程。因为后续的读取操作也在main
函数中,所以不可能有协程从ch1
读取内容了,这样main
函数就会一直处理挂起状态。这就是所谓的死锁,自己跟自己死锁了😂
一般发生死锁后程序不会退出。因为运行时很难判断到底有没有产生死锁。但我们的示例中只有一个协程,也就是运行main
函数的协程,它都挂起了说明一定产生了死锁,所以就直接报错了。但在实际生产系统中,死锁问题很难发现。所以大家在写并发代码的时候一定要小心。
如果把ch1
改成make(chan int, 1)
,它就有了缓冲区。这个时候ch1 <- 1
就不会挂起当前协程,然后面的v := <- ch1
也就能正常执行了。
协程同步
有了通道这个工具,我们就可以解决前面说的问题。改造代码如下:
package main
func main() {
:= make(chan int, 10)
ch for i := 0; i < 10; i++ {
go func() {
.Println("hello")
fmt<- 1
ch }()
}
for i := 0; i < 10; i++ {
<- ch
}
}
我们先是定义了缓冲长度为十的int
型通道。每个协程完成后会往ch
写入消息1
。因为有缓冲,所以不论有没有其他协程读取ch
,刚的写入肯定会成功,对应的协程执行完成后会退出。
我们在main
函数的最后尝试从ch
读取消息,一共读十次。每个协程结束都会写入一条消息。如果协程没有完全,读取操作会挂起main
函数所在的协程。
运行改造后的代码就会看到程序输出十行hello
。上例中也可以使用无缓冲通道。无缓冲通道需要读写双方同时操作,所以main
函数每读取一条消息才会有一个协程退出。而有缓冲的版本则没有此限制,所以打开协程完成工作后会直接退出,不受main
函数所在协程的影响。但无论如何,如果没有同步,并发协程的执行顺序是不确定的。
并发顺序
我们稍微修改一下上述代码:
package main
func main() {
:= make(chan int, 10)
ch for i := 0; i < 10; i++ {
go func(id int) {
.Println("hello", id)
fmt<- 1
ch }(i)
}
for i := 0; i < 10; i++ {
<- ch
}
}
我们给每个协程加一个序号,通过匿名函数的参数传进去,然后在运行的时候打开出来。多执行几次,你会发现每次的执行顺序都不一样,这就印证了刚才的判断。所以,协程在并发运行的时候顺序不可控!
range
语法
在实践中经常会循环读取通道,所以Go语言也支持使用range
关键字实现「遍历」通道的效果。刚才main
函数最后的for
循环可以改写为:
for v := range ch {
.Println(v) //随便做点事
fmt}
使用range
的时候不需要指定箭头操作符!它实际的运行过程如下:
for {
:= <-ch
v .Println(v)
fmt}
所以这个 for 循环不会自动结束,除非有其他协程使用close
函数关闭ch
通道。
最实战的例子是定时间器。如果我们想每隔一段时间输出一条消息,可以使用time
标准库提供的通道:
:= time.Tick(3 * time.Second)
ch go func() {
for t := range ch {
.Println(t)
fmt}
}()
time.Tick(3*time.Second)
会返回一个chan time.Time
通道,每隔三秒就可以从里面读取一个时间对象。上面的代码创建了一个单独协程,不停从定时器通道读取时间。只要main
函数不退出,该协程就会不停打印定时器触发的时间。
但问题来了,上面的循环什么时候结束呢?答案是永远不会结束。如果通道里没有内容了,协程在读取的时候会被挂起。但有时候我们希望告知对应的协程已经没有消息了,工作干完了,可以退出了。
通知退出有几种方法。最简单的就是关闭对应的通道:
:= make(chan int)
ch go func() {
.Sleep(1*time.Second)
timeclose(ch)
}()
for v := range ch {
.Println(v)
fmt}
通道关闭之后for
循环会收到信号,然后结束循环。但这种办法有个副作用:
:= <-ch v
如果是自己用箭头操作符读取消息,当通道关闭后代码还是会收到一个值,只不过这次是零值。本例中v
最后的取值是0
。为了区分正常的消息零值和关闭零值,Go语言还支持另外一种读取语法:
, ok := <-ch v
通道关闭的时候上述读取也会返回,而且ok
会被设置为false
。程序可以通过检查ok
变量来确定通道是否已经关闭。
select
语法
除了关闭通道之外,我们还可以使用一个单独的控制通道来通知协程退出。比如:
:= make(chan int)
ch1 := make(chan int)
ch2
go func() {
for {
:= <-ch1
v .Println(v)
fmtif v := <-ch2; v > 0 {
return
}
}
}()
这段程序在协程里先从ch1
读取数据,然后打印出来。它在继续循环之前会尝试从ch2
读取消息。如果有消息则表明活干完了,可以退出。这段程序有一个问题,如果ch1
里面迟迟没有消息,那么协程就会卡在v := <-ch1
这一行。此时外界通过ch2
发信号没法关闭协程,因为协程被挂起了。即使有协程给ch1
发消息,程序会输出对应的值。但如果外界没有给ch2
发消息,那么v := <-ch2
也会卡住。所以说上述代码根本无法实现我们的设计意图。
这个问题的核心是在同一个协程中从通道读取消息必须按顺序执行。而我们希望能同时从多个通道等待消息。无论是ch1
还是ch2
,只要有消息就唤醒协程处理。这个功能需要select
关键字:
go func() {
for {
select {
case v := <-ch1:
.Println(v)
fmtcase <-ch2:
return
}
}
}()
因为用上了select
,只要ch1
和ch2
任何通道有新消息,处理协程都会被唤醒。select
还支持default
分支,对应的是所有case
分支都没有新消息的情景,业务代码使用较少,初学者记住有这么回事就行。后面边用边学。
除了用通道来同步协程外,Go官方还封装了WaitGroup
对象,之前的示例可以改写为:
import "sync"
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
.Add(1)
wggo func() {
defer wg.Done()
.Println("hello")
fmt}()
}
.Wait()
wg}
声明wg
之后,每创建一个协程就调用wg.Add(1)
,表示要多等待一个协程。main
函数最终调用wg.Wait()
等待所有协程结束。每一个协程结束后需要调用wg.Done()
。所有协程都结构后wg.Wait()
函数就会返回,整个程序才能退出。
新代码中使用defer wg.Done()
来调用函数,它能确保每个协程结束时一定能执行wg.Done()
函数。如果不用defer
而且协程在执行的过程产生了panic
,那就可能没法执行对应的wg.Done()
函数,所以main
函数会一直等待,从而产生死锁。
到这里我们就基本学完协程同步相关的知识了。接下来我们学习另一个重要主题,协程争用。
协程争用
所谓争用,就是多个协程争相读写同一个变量。前文所讲的通道就是最典型的例子,不同的协程可能会同时读写通道。
因为有争用,所以也就有了并发安全的概念。如果某类型的变量支持多协程同时读写,我们就称之为并发安全。通道类型就是并发安全的。但是除非特殊说明,几乎所有类型都不是并发安全的。如果多协程同时读写某变量,轻则会产生 panic 进而程序退出(是的,这是简单情况);重则会破坏数据,但程序以一种错误的方式持续运行,等发现的时候已经很难收场。
先举一个例子,让大家感觉一把:
func main() {
:= 0
a := sync.WaitGroup{}
g for i := 0; i < 10000; i++ {
.Add(1)
ggo func() {
defer g.Done()
++
a}()
}
.Wait()
g.Println(a)
fmt}
程序声明了一个变量a
,然后启动一万个协程对该变量做加一操作。运行几次就会发现,结果并非总是一万。大家自己运行体验一下。
出现这种现象的根本原因是所有协程同时操作一个变量a
。因为协程的执行顺序和时间并不固定,当某个协程把a
从0
改为1
的时候,有可能有另外一个协程没有拿到a
的最新值1
,依然是在0
的基础上加一再写回a
对应的内存,这样就可能产生错误的结果。
我之前还遇到并发读写 map 导致程序异常退出的案例,具体可以阅读这篇文章。
并发锁
最简单的办法是加锁。协程在更新a
之前先获取一把锁,这个时候其他协程因为不可能同时获取锁,所以只能等当前协程更新之后再更新。当前协程更新完成后需要主动释放锁,这样其他更新协程才能继续尝试获取锁并更新a
的值。
改良后的代码如下:
func main() {
:= 0
a := sync.Mutex{}
m := sync.WaitGroup{}
g for i := 0; i < 10000; i++ {
.Add(1)
ggo func() {
defer g.Done()
.Lock()
mdefer m.Unlock()
++
a}()
}
.Wait()
g.Println(a)
fmt}
先声明m := sync.Mutex{}
。然后各协程在更新之前先尝试获取锁m.Lock()
。一次只能有一个协程锁定成功,其他协程都会被挂起。等成功的协程更新完成后会通过defer m.Unlock()
释放锁定。这时候被挂起的协程又会被唤醒,开始新一轮的争抢过程。
加锁的本质是排队,所有的协程按照获取锁的顺序依次更新a
的值,这样就不会产生并发总题。但是,加锁是有代价的。没有抢到锁的协程会被挂起,而且协程多了相互争锁也会给操作系统带来一定的负担。所以说还是要少写可能产生争用的代码。
减少急用最简单的办法是不要共享内存(变量)。不共享就不会有争用,大家各干各的,互不影响。所以我们要尽量避免使用共享变量、指针、切片和字节,这些变量都是按引用传递,不同协程可能会操作同一段内存,从而产生争用问题。Go语言本身也鼓励传值,鼓励拷贝内存,虽然会有一些性能上的损耗,但跟并发引起的BUG相比,这种损耗不值一提。
并发安全类型
再一个办法就是使用并发安全的数据类型。比如字典可以使用sync.Map
代替内置的map
。但是无脑使用并发版本类型也可能会导致性能问题,因为多数情况下并不会碰到多协程争用问题。
:= sync.Map{}
m for i := 0; i < 10; i++ {
go func() {
// 无需加锁
.Store(i, fmt.Sprint(i)) // 1 => "1"
m}()
}
:= m.Load(1) // 返回 interface{}
v .(string) // 转回 "1" v
因为不是内置类型,所以sync.Map
不支持range
关键字,只能通过Range()
来遍历。
原子类型
有一类并发安全的类型叫原子类型,它们由底层硬件实现并发安全,几乎没有性能损耗。大家可以优先考虑使用。相关的类型都封装在sync/atomic
这个包。
前面的例子可以改写为:
import "sync/atomic"
func main() {
:= &atomic.Int32{}
a := sync.WaitGroup{}
g for i := 0; i < 10000; i++ {
.Add(1)
ggo func() {
defer g.Done()
.Add(1)
a}()
}
.Wait()
g.Println(a.Load())
fmt}
读写锁
为了降低加锁成本,人们发明了读写锁。简单来说多数变量都是读多写少。多个协程同时读一个变量不会有问题,只要读的过程中没有人修改这个变量就行了。
首先创建读写锁:
:= sync.RWMutex{} rwm
读协程需要获取读锁,写协程需要获取写锁:
.RLock() // 获取读锁
rwm.Lock() // 获取写锁 rwm
因为读不会修改数据,所以允许多个协程同时获取读锁,也就并发读取变量内容。此时如果有少量协程想修改内容,它们需要获取写锁。写锁是排他锁,需要等所有读锁和其他写锁释放才能获取。一旦写协程成功获取写锁,在它解锁之前,所有其他尝试获取读锁或写锁的协程都会被挂起。读锁也叫共享锁,写锁也叫排他锁。
总结
到这就讲完了并发相关的主要内容。不论是哪一种语言,并发编程都是非常困难的领域。Go语言内置协程只是降低了并发编程的门槛,但绝对没有降低并发编程的难度。初学者在学习和使用协程的时候一定要慎之又慎,不然很容易出问题。