什么是channel
Goroutine 和 channel 是 Go 语言并发编程的两大基石。Goroutine 用于执行并发任务,channel 用于 goroutine 之间的同步、通信。
// 简单示例
package main
import (
"fmt"
)
func main() {
ch := make(chan string)
go func() {
ch <- "Hello World!"
}()
msg := <-ch
fmt.Println(msg)
}
通过 channel,Go 实现了通过通信来实现内存共享。Channel 是在多个 goroutine 之间传递数据和同步的重要手段。
Channel 在 gouroutine 间架起了一条管道,在管道里传输数据; 由于它是线程安全的,所以用起来非常方便;channel 还提供“先进先出”的特性;它还能影响 goroutine 的阻塞和唤醒。
这也是channel的经典思想,不要通过共享内存来通信,而是通过通信来实现内存共享(CSP)
那么使用
sharing memory
(mutex)还是使用channel
来做通信呢?这边可以参考如下图的决策channel特性(CSP)
channel 字面意义是“通道”,类似于 Linux 中的管道。声明 channel 的语法如下:
chan T // 声明一个双向通道
chan<- T // 声明一个只能用于发送的通道
<-chan T // 声明一个只能用于接收的通道
单向通道的声明,用
<-
来表示,它指明通道的方向。你只要明白,代码的书写顺序是从左到右就马上能掌握通道的方向是怎样的。因为 channel 是一个引用类型,所以在它被初始化之前,它的值是
nil
,channel 使用 make
函数进行初始化。可以向它传递一个 int 值,代表 channel 缓冲区的大小(容量),构造出来的是一个缓冲型的 channel;不传或传 0 的,构造的就是一个非缓冲型的 channel。对不带缓冲的 channel 进行的操作实际上可以看作“同步模式”,带缓冲的则称为“异步模式”。
- 同步模式下,发送方和接收方要同步就绪,只有在两者都 ready 的情况下,数据才能在两者间传输(后面会看到,实际上就是内存拷贝)。否则,任意一方先行进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒。
- 异步模式下,在缓冲槽可用的情况下(有剩余容量),发送和接收操作都可以顺利进行。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作(如接收)才会被唤醒。
小结一下:同步模式下,必须要使发送方和接收方配对,操作才会成功,否则会被阻塞;异步模式下,缓冲槽要有剩余容量,操作才会成功,否则也会被阻塞。
channel实现原理
本质上channel在设计上是一个有锁的环形队列,对 chan 的发送和接收操作都会在编译期间转换成为底层的发送接收函数。
数据结构
type hchan struct {
// chan 里元素数量
qcount uint
// chan 底层循环数组的长度
dataqsiz uint
// 指向底层循环数组的指针
// 只针对有缓冲的channel
buf unsafe.Pointer
// chan 中元素大小
elemsize uint16
// chan 是否被关闭
closed uint32
// chan中元素类型
elemtype *_type
// 已发送元素在循环数组中的索引
sendx uint // send index
// 已接收元素在循环数组中的索引
recvx uint //receive index
// 等待接收的goroutine队列
recvq waitq //list of recv waiters
// 等待发送的goroutine队列
sendq waitq // list of send waiters
// 保护hchan中所有字段
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
重点讲解几个字段:
buf
: 指向底层循环数组,只有缓冲型的channel才有
sendx
,recvx
均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。
sendq
,recvq
分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。
waitq
是sudog
的一个双向链表,而sudog
实际上是对goroutine
的一个封装
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
//......
}
lock
用来保证每个读 channel 或写 channel 的操作都是原子的。
例如,创建一个容量为 6 的,元素为 int 型的 channel 数据结构如下 :
创建
我们知道,通道有两个方向,发送和接收。理论上来说,我们可以创建一个只发送或只接收的通道,但是这种通道创建出来后,怎么使用呢?一个只能发的通道,怎么接收呢?同样,一个只能收的通道,如何向其发送数据呢?
一般而言,使用
make
创建一个能收能发的通道:// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)
通过汇编分析,我们知道,最终创建 chan 的函数是
makechan
:// 通用创建方法
func makechan(t *chantype, size int) *hchan
// 类型为 int64 的进行特殊处理
func makechan64(t *chantype, size int64) *hchan
从函数原型来看,创建的 chan 是一个指针。所以我们能在函数间直接传递 channel,而不用传递 channel 的指针。具体来看下代码:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 省略了检查 channel size,align 的代码
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
// 如果元素类型不含指针 或者 size 大小为 0(无缓冲类型)
// 只进行一次内存分配
case mem == 0:
// 如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
// 只分配 "hchan 结构体大小 + 元素大小*个数" 的内存
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
// 1. 非缓冲型的,buf 没用,直接指向 chan 起始地址处
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 如果是缓冲型 channel 且元素大小不等于 0(大小等于 0的元素类型:struct{})
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 2. 缓冲型的,能进入到这里,说明元素无指针且元素类型为 struct{},也无影响
// 因为只会用到接收和发送游标,不会真正拷贝东西到 c.buf 处(这会覆盖 chan的内容)
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
// 循环数组长度
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
// 返回 hchan 指针
return c
}
创建 channel 的逻辑主要分为三大块:
- 当前 channel 不存在缓冲区,也就是元素大小为 0 的情况下,就会调用
mallocgc
方法分配一段连续的内存空间。
- 当前 channel 存储的类型存在指针引用,就会连同
hchan
和底层数组同时分配一段连续的内存空间。
- 通用情况,默认分配相匹配的连续内存空间。
需要注意到一块特殊点,那就是 channel 的创建都是调用的
mallocgc
方法,也就是 channel 都是创建在堆上的。因此 channel 是会被 GC 回收的,自然也不总是需要 close
方法来进行显示关闭了。发送
channel发送数据示例
go func() {
ch <- "Hello World"
}()
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
基础前置判断处理
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果 channel 是 nil
if c == nil {
// 不能阻塞,直接返回 false,表示未发送成功
if !block {
return false
}
// 当前 goroutine 被挂起,发生panic
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 对于不阻塞的 send,快速检测失败场景
// 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
// 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
// 2. channel 是缓冲型的,但循环数组已经装满了元素
if !block && c.closed == 0 && full(c) {
return false
}
// .....
}
func full(c *hchan) bool {
// c.dataqsiz is immutable (never written after the channel is created)
// so it is safe to read at any time during channel operation.
if c.dataqsiz == 0 {
// Assumes that a pointer read is relaxed-atomic.
return c.recvq.first == nil
}
// Assumes that a uint read is relaxed-atomic.
return c.qcount == c.dataqsiz
}
上面的代码注释地比较详细了,我们来详细看看。
- 如果检测到
channel
是空的,逻辑上向nil channel发送数据,调用gopark
发生panic。
- 对于不阻塞的发送操作,如果
channel
未关闭并且没有多余的缓冲空间(说明:a. channel 是非缓冲型的,且等待接收队列里没有 goroutine;b. channel 是缓冲型的,但循环数组已经装满了元素)
没有多余缓冲空间的失败场景:
- 若非阻塞未关闭,同时底层数据
dataqsiz
大小为0(缓冲区无元素)
- 若是
qcount
与dataqsiz
大小相同(缓冲区已满)
上互斥锁
在完成channel的前置判断后,在发送数据前,channel会进行上锁
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ....
// 锁住 channel,并发安全
lock(&c.lock)
// ....
}
上锁后就能保住并发安全。
直接发送
在正式开发发送前,加锁之后,会对channel进行一次状态判断
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
// 如果 channel 关闭了
if c.closed != 0 {
unlock(&c.lock)
// 直接 panic
panic(plainError("send on closed channel"))
}
// 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// ...
}
这种情况是最为基础的,也就是当前 channel 有正在阻塞等待的接收方,那么只需要直接发送就可以了。
缓冲发送
判断 channel 缓冲区中是否还有空间:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
// 对于缓冲型的 channel,如果还有缓冲空间
if c.qcount < c.dataqsiz {
// qp 指向 buf 的 sendx 位置
qp := chanbuf(c, c.sendx)
// 将数据从 ep 处拷贝到 qp
typedmemmove(c.elemtype, qp, ep)
// 发送游标值加 1
c.sendx++
// 如果发送游标值等于容量值,游标值归 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 缓冲区的元素数量加一
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// ...
}
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
// 返回循环队列里第 i 个元素的地址处
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
会对缓冲区进行判定(
qcount
和 dataqsiz
字段),以此识别缓冲区的剩余空间。紧接进行如下操作:- 调用
chanbuf
方法,以此获得底层缓冲数据中位于 sendx 索引的元素指针值。
- 调用
typedmemmove
方法,将所需发送的数据拷贝到缓冲区中。
- 数据拷贝后,对 sendx 索引自行自增 1。同时若 sendx 与 dataqsiz 大小一致,则归 0(环形队列)。
- 自增完成后,队列总数同时自增 1。解锁互斥锁,返回结果。
至此针对缓冲区的数据操作完成。但若没有走进缓冲区处理的逻辑,则会判断当前是否阻塞 channel,若为非阻塞,将会解锁并直接返回失败。
配合图示如下:
堵塞发送
最后会进入阻塞等待发送的过程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
// channel 满了,发送方会被阻塞。接下来会构造一个 sudog
// 获取当前 goroutine 的指针
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 当前 goroutine 进入发送等待队列
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// 当前 goroutine 被挂起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
// ...
}
- 调用
getg
方法获取当前 goroutine 的指针,用于后续发送数据。
- 调用
acquireSudog
方法获取sudog
结构体,并设置当前 sudog 具体的待发送数据信息和状态。
- 调用
c.sendq.enqueue
方法将刚刚所获取的sudog
加入待发送的等待队列。
- 调用
gopark
方法挂起当前 goroutine(会记录执行位置),状态为waitReasonChanSend
,阻塞等待 channel。
- 调用
KeepAlive
方法保证待发送的数据值是活跃状态,也就是分配在堆上,避免被 GC 回收。
等待唤醒
在当前goroutine被挂起后,其将会在channel能后发送数据后被唤醒
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
// 从这里开始被唤醒了(channel 有机会可以发送了)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 去掉 mysg 上绑定的 channel
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
唤醒 goroutine(调度器在停止 g 时会记录运行线程和方法内执行的位置)并完成 channel 的阻塞数据发送动作后。进行基本的参数检查,确保是符合要求的(纵深防御),接着开始取消 mysg 上的 channel 绑定和 sudog 的释放。
发送函数
调用 send 函数将元素直接从发送者的栈拷贝到接收者的栈,关键操作由
sendDirect
函数完成。不同 goroutine 的栈是各自独有的。为了不出问题,写的过程中增加了写屏障,保证正确地完成写操作。这样做的好处是减少了一次内存 copy:不用先拷贝到 channel 的 buf,直接由发送者到接收者,效率得以提高。// send 函数处理向一个空的 channel 发送操作
// ep 指向被发送的元素,会被直接拷贝到接收的 goroutine
// 之后,接收的 goroutine 会被唤醒
// c 必须是空的(因为等待队列里有 goroutine,肯定是空的)
// c 必须被上锁,发送操作执行完后,会使用 unlockf 函数解锁
// sg 必须已经从等待队列里取出来了
// ep 必须是非空,并且它指向堆或调用者的栈
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
if sg.elem != nil {
// 直接拷贝内存(从发送者到接收者)
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// sudog 上绑定的 goroutine
gp := sg.g
// 解锁
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒接收的 goroutine. skip 和打印栈相关,暂时不理会
goready(gp, skip+1)
}
// 向一个非缓冲型的 channel 发送数据、从一个无元素的(非缓冲型或缓冲型但空)的 channel
// 接收数据,都会导致一个 goroutine 直接操作另一个 goroutine 的栈
// 由于 GC 假设对栈的写操作只能发生在 goroutine 正在运行中并且由当前 goroutine 来写
// 所以这里实际上违反了这个假设。可能会造成一些问题,所以需要用到写屏障来规避
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src 在当前 goroutine 的栈上,dst 是另一个 goroutine 的栈
// 直接进行内存"搬迁"
// 如果目标地址的栈发生了栈收缩,当我们读出了 sg.elem 后
// 就不能修改真正的 dst 位置的值了
// 因此需要在读和写之前加上一个屏障
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// No need for cgo write barrier checks because dst is always
// Go memory.
memmove(dst, src, t.size)
}
- 调用
sendDirect
方法将待发送的数据直接拷贝到待接收变量的内存地址(执行栈)。 - 例如:
msg := <-ch
语句,也就是将数据从ch
直接拷贝到了msg
的内存地址。
- 调用
sg.g
属性, 从 sudog 中获取等待接收数据的 goroutine,并传递后续唤醒所需的参数。
- 调用
goready
方法唤醒需接收数据的 goroutine,期望从_Gwaiting
状态调度为_Grunnable
。
发送调度示例
好了,看完源码。我们接着来分析例子:
func goroutineA(a <-chan int) {
val := <-a
fmt.Println("G1 received data:", val)
return
}
func goroutineB(b <-chan int) {
val := <-b
fmt.Println("G2 received data:", val)
return
}
func main() {
ch := make(chan int)
go goroutineA(ch)
go goroutineB(ch)
ch <- 3
time.Sleep(time.Second)
}
在发送小节里我们说到 G1 和 G2 现在被挂起来了,等待 sender 的解救。在第 17 行,主协程向 ch 发送了一个元素 3,来看下接下来会发生什么。
根据前面源码分析的结果,我们知道,sender 发现 ch 的 recvq 里有 receiver 在等待着接收,就会出队一个 sudog,把 recvq 里 first 指针的 sudo “推举”出来了,并将其加入到 P 的可运行 goroutine 队列中。
然后,sender 把发送元素拷贝到 sudog 的 elem 地址处,最后会调用 goready 将 G1 唤醒,状态变为 runnable。
当调度器光顾 G1 时,将 G1 变成 running 状态,执行 goroutineA 接下来的代码。G 表示其他可能有的 goroutine。
这里其实涉及到一个协程写另一个协程栈的操作。有两个 receiver 在 channel 的一边虎视眈眈地等着,这时 channel 另一边来了一个 sender 准备向 channel 发送数据,为了高效,用不着通过 channel 的 buf “中转”一次,直接从源地址把数据 copy 到目的地址就可以了,效率高啊!
上图是一个示意图, 3 会被拷贝到 G1 栈上的某个位置,也就是 val 的地址处,保存在 elem 字段。
接收
在继续分析前面小节的例子前,我们先来看一下接收相关的源码。在清楚了接收的具体过程之后,也就能轻松理解具体的例子了。
接收操作有两种写法:
- 一种带 "ok",反应 channel 是否关闭;
- 一种不带 "ok",这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。
// msg := <-ch
func chanrecv1(c *hchan, elem unsafe . Pointer ) {
chanrecv(c, elem, true )
}
// msg, ok := <- ch
func chanrecv2(c *hchan, elem unsafe . Pointer ) (received bool ) {
_, received = chanrecv(c, elem, true )
return
}
chanrecv1
函数处理不带 "ok" 的情形, chanrecv2
则通过返回 "received" 这个字段来反应 channel 是否被关闭。接收值则比较特殊,会“放到”参数 elem
所指向的地址了。如果代码里忽略了接收值,这里的 elem 为 nil。无论如何,最终转向了 chanrecv
函数:// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
}
前置判断处理
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 如果是一个 nil 的 channel
if c == nil {
// 如果不阻塞,直接返回 (false, false)
if !block {
return
}
// 否则,接收一个 nil 的 channel,goroutine 挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
// 不会执行到这里
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
// 当我们观察到 channel 没准备好接收:
// 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
// 2. 缓冲型,但 buf 里没有元素
// 之后,又观察到 closed == 0,即 channel 未关闭。
// 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
// 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
// The channel is irreversibly closed and empty.
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
// 加锁
lock(&c.lock)
// channel 已关闭,并且循环数组 buf 里没有元素
// 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
// 也就是说即使是关闭状态,但在缓冲型的 channel,
// buf 里有元素的情况下还能接收到元素
if c.closed != 0 && c.qcount == 0 {
// 解锁
unlock(&c.lock)
if ep != nil {
// 从一个已关闭的 channel 执行接收操作,且未忽略返回值
// 那么接收的值将是一个该类型的零值
// typedmemclr 根据类型清理相应地址的内存
typedmemclr(c.elemtype, ep)
}
// 从一个已关闭的 channel 接收,selected 会返回true
return true, false
}
// ...
}
一开始时
chanrecv
方法会判断其是否为 nil channel。场景如下:
- 若 channel 是 nil channel,且为阻塞接收则调用
gopark
方法挂起当前 goroutine。
- 若 channel 是非阻塞模式,则直接返回。
而接下来对于非阻塞模式的 channel 会进行快速失败检查,检测 channel 是否已经准备好接收。其分以下几种情况:
- 无缓冲区:循环队列为 0 及等待队列 sendq 内没有 goroutine 正在等待。
- 有缓冲区:缓冲区数组为空。
随后会对 channel 的 closed 状态进行判断,因为 channel 是无法重复打开的,需要确定当前 channel 是否为未关闭状态。再确定接收失败,返回。
但若是 channel 已经关闭且不存在缓存数据了,则会清理
ep
指针中的数据并返回。直接接收
当发现 channel 上有正在阻塞等待的发送方时,则直接进行接收:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
// 等待发送队列里有 goroutine 存在,说明 buf 是满的
// 这有可能是:
// 1. 非缓冲型的 channel
// 2. 缓冲型的 channel,但 buf 满了
// 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
// 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// ...
}
缓冲接收
当发现 channel 的缓冲区中有元素时:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
// 缓冲型,buf 里有元素,可以正常接收
if c.qcount > 0 {
// 直接从循环数组里找到要接收的元素
qp := chanbuf(c, c.recvx)
// 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循环数组里相应位置的值
typedmemclr(c.elemtype, qp)
// 接收游标向前移动
c.recvx++
// 接收游标归零
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buf 数组里的元素个数减 1
c.qcount--
// 解锁
unlock(&c.lock)
return true, true
}
if !block {
// 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
unlock(&c.lock)
return false, false
}
// ..
}
将会调用
chanbuf
方法根据 recvx
的索引位置取出数据,找到要接收的元素进行处理。若所接收到的数据和所传入的变量均不为空,则会调用 typedmemmove
方法将缓冲区中的数据拷贝到所传入的变量中。最后数据拷贝完毕后,进行各索引项和队列总数的自增增减,并调用
typedmemclr
方法进行内存数据的清扫。阻塞接收
当发现 channel 上既没有待发送的 goroutine,缓冲区也没有数据时。将会进入到最后一个阶段阻塞接收:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ..
// 接下来就是要被阻塞的情况了
// 构造一个 sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 待接收数据的地址保存下来
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 进入channel 的等待接收队列
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// 将当前 goroutine 挂起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
//...
}
这一块接收逻辑与发送也基本类似,主体就是获取当前 goroutine,构建 sudog 结构保存当前待接收数据(发送方)的地址信息,并将 sudog 加入等待接收队列。最后调用
gopark
方法挂起当前 goroutine,等待唤醒。等待唤醒
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
// 被唤醒了,接着从这里继续执行一些扫尾工作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
// ...
}
被唤醒后,将恢复现场,回到对应的执行点,完成最后的扫尾工作。
接收函数
于是,调用 recv 函数:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果是非缓冲型的 channel
if c.dataqsiz == 0 {
// 未忽略接收的数据
if ep != nil {
// 直接拷贝数据,从 sender goroutine -> receiver goroutine
recvDirect(c.elemtype, sg, ep)
}
} else {
// 缓冲型的 channel,但 buf 已满。
// 将循环数组 buf 队首的元素拷贝到接收数据的地址
// 将发送者的数据入队。实际上这时 revx 和 sendx 值相等
// 找到接收游标
qp := chanbuf(c, c.recvx)
// 将接收游标处的数据拷贝给接收者
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将发送者数据拷贝到 buf
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
// 解锁
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒发送的 goroutine。需要等到调度器的光临
goready(gp, skip+1)
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// dst is on our stack or the heap, src is on another stack.
// The channel is locked, so src will not move during this
// operation.
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
该方法在接收上分为两种情况,分别是直接接收和缓冲接收:
- 直接接收(不存在缓冲区):
- 调用
recvDirect
方法,其作用与sendDirect
方法相对,会直接从发送方的 goroutine 调用栈中将数据拷贝过来到接收方的 goroutine。
- 缓冲接收(存在缓冲区):
- 调用
chanbuf
方法,根据recvx
索引的位置读取缓冲区元素,并将其拷贝到接收方的内存地址。 - 拷贝完毕后,对
sendx
和recvx
索引位置进行调整。
最后还是常规的 goroutine 调度动作,会调用
goready
方法来唤醒当前所处理的 sudog 的对应 goroutine,其状态改成RUNNABLE。那么在下一轮调度时,既然已经接收了数据,自然发送方也就会被唤醒。- 然后,如果 channel 的 buf 里还有数据,说明可以比较正常地接收。注意,这里,即使是在 channel 已经关闭的情况下,也是可以走到这里的。这一步比较简单,正常地将 buf 里接收游标处的数据拷贝到接收数据的地址。
- 到了最后一步,走到这里来的情形是要阻塞的。当然,如果 block 传进来的值是 false,那就不阻塞,直接返回就好了。
先构造一个 sudog,接着就是保存各种值了。注意,这里会将接收数据的地址存储到了
elem
字段,当被唤醒时,接收到的数据就会保存到这个字段指向的地址。然后将 sudog 添加到 channel 的 recvq 队列里。调用 goparkunlock 函数将 goroutine 挂起。接受调度示例
我们继续之前的例子。
func goroutineA(a <-chan int) {
val := <-a
fmt.Println("G1 received data:", val)
return
}
func goroutineB(b <-chan int) {
val := <-b
fmt.Println("G2 received data:", val)
return
}
func main() {
ch := make(chan int)
go goroutineA(ch)
go goroutineB(ch)
ch <- 3
time.Sleep(time.Second)
}
前面说到第 14 行,创建了一个非缓冲型的 channel,接着,第 15、16 行分别创建了一个 goroutine,各自执行了一个接收操作。通过前面的源码分析,我们知道,这两个 goroutine (后面称为 G1 和 G2 好了)都会被阻塞在接收操作。G1 和 G2 会挂在 channel 的 recq 队列中,形成一个双向循环链表。
在程序的 17 行之前,chan 的整体数据结构如下:
buf
指向一个长度为 0 的数组,qcount 为 0,表示 channel 中没有元素。重点关注 recvq
和 sendq
,它们是 waitq 结构体,而 waitq 实际上就是一个双向链表,链表的元素是 sudog,里面包含 g
字段, g
表示一个 goroutine,所以 sudog 可以看成一个 goroutine。recvq 存储那些尝试读取 channel 但被阻塞的 goroutine,sendq 则存储那些尝试写入 channel,但被阻塞的 goroutine。此时,我们可以看到,recvq 里挂了两个 goroutine,也就是前面启动的 G1 和 G2。因为没有 goroutine 接收,而 channel 又是无缓冲类型,所以 G1 和 G2 被阻塞。sendq 没有被阻塞的 goroutine。
recvq
的数据结构如下。这里直接引用文章中的一幅图,用了三维元素,画得很好:再从整体上来看一下 chan 此时的状态:
G1 和 G2 被挂起了,状态是
WAITING
。关于 goroutine 调度器这块不是今天的重点,当然后面肯定会写相关的文章。这里先简单说下,goroutine 是用户态的协程,由 Go runtime 进行管理,作为对比,内核线程由 OS 进行管理。Goroutine 更轻量,因此我们可以轻松创建数万 goroutine。一个内核线程可以管理多个 goroutine,当其中一个 goroutine 阻塞时,内核线程可以调度其他的 goroutine 来运行,内核线程本身不会阻塞。这就是通常我们说的
M:N
模型:M:N
模型通常由三部分构成:M、P、G。M 是内核线程,负责运行 goroutine;P 是 context,保存 goroutine 运行所需要的上下文,它还维护了可运行(runnable)的 goroutine 列表;G 则是待运行的 goroutine。M 和 P 是 G 运行的基础。继续回到例子。假设我们只有一个 M,当 G1(
go goroutineA(ch)
) 运行到 val:=<-a
时,它由本来的 running 状态变成了 waiting 状态(调用了 gopark 之后的结果):G1 脱离与 M 的关系,但调度器可不会让 M 闲着,所以会接着调度另一个 goroutine 来运行:
G2 也是同样的遭遇。现在 G1 和 G2 都被挂起了,等待着一个 sender 往 channel 里发送数据,才能得到解救。
关闭
close 函数先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒。
前置判断处理
func closechan(c *hchan) {
// 关闭一个 nil channel,panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 上锁
lock(&c.lock)
// 如果 channel 已经关闭
if c.closed != 0 {
unlock(&c.lock)
// panic
panic(plainError("close of closed channel"))
}
// 修改关闭状态
c.closed = 1
// ....
}
基本检查和关闭标志设置,保证 channel 不为 nil 和未关闭,保证边界。
释放接收方
在完成了异常边界判断和标志设置后,会将接受者的 sudog 等待队列(recvq)加入到待清除队列 glist 中:
func closechan(c *hchan) {
var glist gList
// release all readers
// 将 channel 所有等待接收队列的里 sudog 释放
for {
// 从接收队列里出队一个 sudog
sg := c.recvq.dequeue()
// 出队完毕,跳出循环
if sg == nil {
break
}
// 如果 elem 不为空,说明此 receiver 未忽略接收数据
// 给它赋一个相应类型的零值
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 取出 goroutine
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
// 相连,形成链表
glist.push(gp)
}
// ....
}
所取出并加入的 goroutine 状态需要均为
_Gwaiting
,以保证后续的新一轮调度。释放发送方
同样,与释放接收方一样。会将发送方也加入到到待清除队列 glist 中:
func closechan(c *hchan) {
// ...
// release all writers (they will panic)
// 将 channel 等待发送队列里的 sudog 释放
// 如果存在,这些 goroutine 将会 panic
for {
// 从发送队列里出队一个 sudog
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 发送者会 panic
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
// 形成链表
glist.push(gp)
}
// 解锁
unlock(&c.lock)
// ...
}
协程调度
将所有 glist 中的 goroutine 状态从
_Gwaiting
设置为 _Grunnable
状态,等待调度器的调度:func closechan(c *hchan) {
// ...
// Ready all Gs now that we've dropped the channel lock.
// 遍历链表
for !glist.empty() {
// 取最后一个
gp := glist.pop()
gp.schedlink = 0
// 唤醒相应 goroutine
goready(gp, 3)
}
}
后续所有的 goroutine 允许被重新调度后。若原本还在被动阻塞的发送方或接收方,将重获自由,后续该干嘛就去干嘛了,再跑回其所属的应用流程。
无缓冲流程图
有缓冲流程图
使用场景
- 传递的数据时候,把数据传递给下一个goroutine处理
- 分发任务单元,做一个应用池的时候,前面把任务包装号,扔给其中一个worker就可以,worker用channel实现任务队列,它从channel中取任务就ok
- 另外交流异步的结果,可以利用channel进行传递
- 可以进行复杂编排功能
Loading Comments...