Go 进阶

 Go 󰈭 4267字

并发

Go语言中的并发程序可以用两种手段来实现。本章讲解goroutine和channel,其支持“顺序通信进程”(communicating sequential processes)或被简称为CSP。CSP是一种现代的并发编程模型,在这种编程模型中值会在不同的运行实例(goroutine)中传递,尽管大多数情况下仍然是被限制在单一实例中。

Goroutines

在Go语言中,每一个并发的执行单元叫作一个goroutine。当一个程序启动时,主函数就在一个单独的goroutine上运行,称为main goroutine。

新的goroutine通过go语句创建:

1go fun()

主函数返回时,所有的goroutine都会被直接打断,程序退出。

并发处理TCP连接

go程序监听8000端口,每次当client连接后通过goroutinue处理业务操作,且不阻塞第二个client的连接。

 1func main() {
 2	listener, err := net.Listen("tcp", "localhost:8000")
 3	if err != nil {
 4		log.Fatal(err)
 5	}
 6
 7	for {
 8		conn, err := listener.Accept()
 9		if err != nil {
10			log.Print(err) // e.g., connection aborted
11			continue
12		}
13		go handleConn(conn) // handle one connection at a time
14	}
15}
16
17func handleConn(c net.Conn) {
18	defer c.Close()
19	for {
20		_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
21		if err != nil {
22			return // e.g., client disconnected
23		}
24		time.Sleep(1 * time.Second)
25	}
26}

Channels

如果说goroutine是Go语言程序的并发体的话,那么channels则是它们之间的通信机制。一个channel是一个通信机制,它可以让一个goroutine通过它给另一个goroutine发送值信息。每个channel都有一个特殊的类型,也就是channels可发送数据的类型。一个可以发送int类型数据的channel一般写为chan int。

使用内置的make函数,我们可以创建一个channel:

1ch = make(chan int)    // unbuffered channel
2ch = make(chan int, 0) // unbuffered channel
3ch = make(chan int, 3) // buffered channel with capacity 3

和map类似,channel也对应一个make创建的底层数据结构的引用。当我们复制一个channel或用于函数参数传递时,我们只是拷贝了一个channel引用,因此调用者和被调用者将引用同一个channel对象。和其它的引用类型一样,channel的零值也是nil。

channel的发送和接收行为如下:

1ch <- x  // a send statement
2x = <-ch // a receive expression in an assignment statement
3<-ch     // a receive statement; result is discarded

Channel还支持close操作,用于关闭channel,随后对基于该channel的任何发送操作都将导致panic异常。对一个已经被close过的channel进行接收操作依然可以接受到之前已经成功发送的数据;如果channel中已经没有数据的话将产生一个零值的数据:

1close(ch)

不带缓冲的Channels

1ch = make(chan int, 0) // unbuffered channel

一个基于无缓存Channels的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的Channels上执行接收操作,当发送的值通过Channels成功传输之后,两个goroutine可以继续执行后面的语句。反之,如果接收操作先发生,那么接收者goroutine也将阻塞,直到有另一个goroutine在相同的Channels上执行发送操作。

基于无缓存Channels的发送和接收操作将导致两个goroutine做一次同步操作。因为这个原因,无缓存Channels有时候也被称为同步Channels。

下述代码给出了一个同步的实例。主goroutine等待后台goroutinue完成copy操作后才结束阻塞,函数返回。

 1func main() {
 2    conn, err := net.Dial("tcp", "localhost:8000")
 3    if err != nil {
 4        log.Fatal(err)
 5    }
 6    done := make(chan struct{})
 7    go func() {
 8        io.Copy(os.Stdout, conn) // NOTE: ignoring errors
 9        log.Println("done")
10        done <- struct{}{} // signal the main goroutine
11    }()
12    mustCopy(conn, os.Stdin)
13    conn.Close()
14    <-done // wait for background goroutine to finish
15}

有些时候,相比于通信的具体内容,我们更关心通信本身的发生,我们将它称为消息事件,这时我们可以使用struct{}来作为元素类型。

串连的channels(pipeline)

考虑这样一个简单的串连channels:

 1func main() {
 2    naturals := make(chan int)
 3    squares := make(chan int)
 4
 5    // Counter
 6    go func() {
 7        for x := 0; ; x++ {
 8            naturals <- x
 9        }
10    }()
11
12    // Squarer
13    go func() {
14        for {
15            x := <-naturals
16            squares <- x * x
17        }
18    }()
19
20    // Printer (in main goroutine)
21    for {
22        fmt.Println(<-squares)
23    }
24}

如果我们希望只发送有限个自然数应该怎么办呢?

如果直接关闭naturals通道,再向该channel发送数据将导致panic异常,而当一个被关闭的channel中已经发送的数据都被成功接收后,后续的接收操作将不再阻塞,它们会立即返回一个零值,因而关闭通道无法终止循环。

如果我们多传输一个结果作为状态,就可以终止循环,如下所示:

 1// Squarer
 2go func() {
 3    for {
 4        x, ok := <-naturals
 5        if !ok {
 6            break // channel was closed and drained
 7        }
 8        squares <- x * x
 9    }
10    close(squares)
11}()

由于考虑到这种处理方式较为笨拙但十分常见,因而Go语言支持使用range循环直接在channels上进行迭代,当channel被关闭并且没有值可以接受时就推出循环:

 1func main() {
 2    naturals := make(chan int)
 3    squares := make(chan int)
 4
 5    // Counter
 6    go func() {
 7        for x := 0; x < 100; x++ {
 8            naturals <- x
 9        }
10        close(naturals)
11    }()
12
13    // Squarer
14    go func() {
15        for x := range naturals {
16            squares <- x * x
17        }
18        close(squares)
19    }()
20
21    // Printer (in main goroutine)
22    for x := range squares {
23        fmt.Println(x)
24    }
25}

单向channels

Go语言的类型系统提供了单方向的channel类型,分别用于只发送或只接收的channel。类型chan<- int表示一个只发送int的channel,只能发送不能接收。相反,类型<-chan int表示一个只接收int的channel,只能接收不能发送。

双向channel可以自动转换为单向channel,但是反之不行。

带缓冲的channels

Go语言新手有时候会将一个带缓存的channel当作同一个goroutine中的队列使用,虽然语法看似简单,但实际上这是一个错误。Channel和goroutine的调度器机制是紧密相连的,如果没有其他goroutine从channel接收,发送者——或许是整个程序——将会面临永远阻塞的风险。如果你只是需要一个简单的队列,使用slice就可以了。

基于select的多路复用

select会等待case中有能够执行的case时去执行。当条件满足时,select才会去通信并执行case之后的语句;这时候其它通信是不会执行的。一个没有任何case的select语句写作select{},会永远地等待下去。

下述代码中,ch这个channel的buffer大小是1,所以会交替的为空或为满,所以只有一个case可以进行下去,无论i是奇数或者偶数,它都会打印0 2 4 6 8。

1ch := make(chan int, 1)
2for i := 0; i < 10; i++ {
3    select {
4    case x := <-ch:
5        fmt.Println(x) // "0" "2" "4" "6" "8"
6    case ch <- i:
7    }
8}

如果多个case同时就绪时,select会随机地选择一个执行,这样来保证每一个channel都有平等的被select的机会。增加上述代码的buffer大小会使其输出变得不确定,因为当buffer既不为满也不为空时,select语句的执行情况就像是抛硬币的行为一样是随机的。

并发的退出

Golang并发:并发协程的优雅退出

  • range
  • ,ok
  • 专用的退出通道

共享变量

竞争条件

竞争条件指的是程序在多个goroutine交叉执行操作时,没有给出正确的结果。数据竞争是竞争条件的一种,无论任何时候,只要有两个goroutine并发访问同一变量,且至少其中的一个是写操作的时候就会发生数据竞争。

一种避免数据竞争的方式是,避免从多个goroutinue访问变量。Go的口头禅是:不要使用共享数据来通信;使用通信来共享数据。

在下面的代码中,Cakes由于管道的通信而严格地被顺序处理。

 1type Cake struct{ state string }
 2
 3func baker(cooked chan<- *Cake) {
 4    for {
 5        cake := new(Cake)
 6        cake.state = "cooked"
 7        cooked <- cake // baker never touches this cake again
 8    }
 9}
10
11func icer(iced chan<- *Cake, cooked <-chan *Cake) {
12    for cake := range cooked {
13        cake.state = "iced"
14        iced <- cake // icer never touches this cake again
15    }
16}

互斥

互斥允许多个goroutine访问变量,但同一时刻最多只有一个正在访问。

sync.Mutex互斥锁

Mutex的Lock方法能够获取到token(这里叫锁),并且Unlock方法会释放这个token。

 1import "sync"
 2
 3var (
 4    mu      sync.Mutex // guards balance
 5    balance int
 6)
 7
 8func Deposit(amount int) {
 9    mu.Lock()
10    balance = balance + amount
11    mu.Unlock()
12}
13
14func Balance() int {
15    mu.Lock()
16    b := balance
17    mu.Unlock()
18    return b
19}

惯例来说,被mutex所保护的变量是在mutex变量声明之后立刻声明的。如果你的做法和惯例不符,确保在文档里对你的做法进行说明。

sync.RWMutex读写锁

读写锁是一种特殊的锁,对读操作并行,而对写操作互斥,叫做“多读单写”锁。

1var mu sync.RWMutex
2var balance int
3func Balance() int {
4    mu.RLock() // readers lock
5    defer mu.RUnlock()
6    return balance
7}
sync.Once惰性初始化

这是一个懒初始化的例子:

 1func loadIcons() {
 2    icons = map[string]image.Image{
 3        "spades.png":   loadIcon("spades.png"),
 4        "hearts.png":   loadIcon("hearts.png"),
 5        "diamonds.png": loadIcon("diamonds.png"),
 6        "clubs.png":    loadIcon("clubs.png"),
 7    }
 8}
 9
10// NOTE: not concurrency-safe!
11func Icon(name string) image.Image {
12    if icons == nil {
13        loadIcons() // one-time initialization
14    }
15    return icons[name]
16}

但该函数并不是并发安全的,现代编译器和CPU可能随意改变指令排布顺序,一种可能的指令顺序如下:

1func loadIcons() {
2    icons = make(map[string]image.Image)
3    icons["spades.png"] = loadIcon("spades.png")
4    icons["hearts.png"] = loadIcon("hearts.png")
5    icons["diamonds.png"] = loadIcon("diamonds.png")
6    icons["clubs.png"] = loadIcon("clubs.png")
7}

因此,一个goroutine在检查icons是非空时,也并不能就假设这个变量的初始化流程已经走完了。

使用读写锁可以保证并发。

 1var mu sync.RWMutex // guards icons
 2var icons map[string]image.Image
 3// Concurrency-safe.
 4func Icon(name string) image.Image {
 5    mu.RLock()
 6    if icons != nil {
 7        icon := icons[name]
 8        mu.RUnlock()
 9        return icon
10    }
11    mu.RUnlock()
12
13    // acquire an exclusive lock
14    mu.Lock()
15    if icons == nil { // NOTE: must recheck for nil
16        loadIcons()
17    }
18    icon := icons[name]
19    mu.Unlock()
20    return icon
21}

通常的方式较为复杂,Once函数简化了这个过程:

1var loadIconsOnce sync.Once
2var icons map[string]image.Image
3// Concurrency-safe.
4func Icon(name string) image.Image {
5    loadIconsOnce.Do(loadIcons)
6    return icons[name]
7}

竞争条件检测

只要在go build,go run或者go test命令后面加上-race的flag,就会使编译器创建一个你的应用的“修改”版或者一个附带了能够记录所有运行期对共享变量访问工具的test,并且会记录下每一个读或者写共享变量的goroutine的身份信息。

线程和Goroutinues

  • 动态栈。一个线程通常有一个固定大小2MB内存做栈,而这对于一个goroutinue来说过大了,无法支持高并发和深层的递归函数。而一个goroutinue会以一个很小的栈(2KB)开始生命周期,该栈也会保存活跃、挂起的函数的本地变量,但区别在于其大小不定,可以动态伸缩,最多可以达到1GB.
  • 调度。OS对于线程的调度需要进行完整的上下文切换,而Go使用了自己的调度器,不需要进入内核的上下文,所以调度代价低得很多。
  • GOMAXPROCC。Go的调度器通过GOMAXPROCC变量决定有多少个OS的线程执行Go代码,其默认值是机器上的CPU核心数。可以通过环境变量来显式地控制这个参数,或者也可以在运行时用runtime.GOMAXPROCS函数来修改它。
 1for {
 2    go fmt.Print(0)
 3    fmt.Print(1)
 4}
 5
 6$ GOMAXPROCS=1 go run hacker-cliché.go
 7111111111111111111110000000000000000000011111...
 8
 9$ GOMAXPROCS=2 go run hacker-cliché.go
10010101010101010101011001100101011010010100110...
  • 线程号。goroutine没有可以被程序员获取到的身份(id)的概念。这一点是设计上故意而为之,由于thread-local storage总是会被滥用。在这种行为下,一个函数的行为可能并不仅由自己的参数所决定,而是由其所运行在的线程所决定,这不好。
嗨! 这里是 rqdmap 的个人博客, 我正关注 GNU/Linux 桌面系统, Linux 内核, 后端开发, Python, Rust 以及一切有趣的计算机技术! 希望我的内容能对你有所帮助~
如果你遇到了任何问题, 包括但不限于: 博客内容说明不清楚或错误; 样式版面混乱等问题, 请通过邮箱 rqdmap@gmail.com 联系我!
修改记录:
  • 2023-09-01 18:14:49单独划分ACM专题; 移动部分博客进入黑洞归档
  • 2023-05-29 23:05:14大幅重构了python脚本的目录结构,实现了若干操作博客内容、sqlite的助手函数;修改原本的文本数 据库(ok)为sqlite数据库,通过嵌入front-matter的page_id将源文件与网页文件相关联
  • 2023-05-08 21:44:36博客架构修改升级
  • 2022-11-16 01:27:34迁移老博客文章内容
Go 进阶