Go语言设计(7)-channel

Go语言设计(7)-channel

Date
Feb 17, 2022
Tags
Go
Concurrency

什么是channel

Goroutine 和 channel 是 Go 语言并发编程的两大基石。Goroutine 用于执行并发任务,channel 用于 goroutine 之间的同步、通信。
// 简单示例
package main

import (
	"fmt"
)

func main() {
	ch := make(chan string)
	go func() {
		ch <- "Hello World!"
	}()

	msg := <-ch
	fmt.Println(msg)
}
通过 channel,Go 实现了通过通信来实现内存共享。Channel 是在多个 goroutine 之间传递数据和同步的重要手段。
notion image
Channel 在 gouroutine 间架起了一条管道,在管道里传输数据; 由于它是线程安全的,所以用起来非常方便;channel 还提供“先进先出”的特性;它还能影响 goroutine 的阻塞和唤醒。
这也是channel的经典思想,不要通过共享内存来通信,而是通过通信来实现内存共享(CSP)
那么使用sharing memory(mutex)还是使用channel来做通信呢?这边可以参考如下图的决策
notion image

channel特性(CSP)

channel 字面意义是“通道”,类似于 Linux 中的管道。声明 channel 的语法如下:
chan T  // 声明一个双向通道
chan<- T  // 声明一个只能用于发送的通道
<-chan T // 声明一个只能用于接收的通道
单向通道的声明,用 <-来表示,它指明通道的方向。你只要明白,代码的书写顺序是从左到右就马上能掌握通道的方向是怎样的。
因为 channel 是一个引用类型,所以在它被初始化之前,它的值是 nil,channel 使用 make 函数进行初始化。可以向它传递一个 int 值,代表 channel 缓冲区的大小(容量),构造出来的是一个缓冲型的 channel;不传或传 0 的,构造的就是一个非缓冲型的 channel。
对不带缓冲的 channel 进行的操作实际上可以看作“同步模式”,带缓冲的则称为“异步模式”。
  • 同步模式下,发送方和接收方要同步就绪,只有在两者都 ready 的情况下,数据才能在两者间传输(后面会看到,实际上就是内存拷贝)。否则,任意一方先行进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒
  • 异步模式下,在缓冲槽可用的情况下(有剩余容量),发送和接收操作都可以顺利进行。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作(如接收)才会被唤醒。
小结一下:同步模式下,必须要使发送方和接收方配对,操作才会成功,否则会被阻塞;异步模式下,缓冲槽要有剩余容量,操作才会成功,否则也会被阻塞。

channel实现原理

本质上channel在设计上是一个有锁的环形队列,对 chan 的发送和接收操作都会在编译期间转换成为底层的发送接收函数。
notion image

数据结构

type hchan struct {
  // chan 里元素数量
	qcount   uint        
  // chan 底层循环数组的长度   
	dataqsiz uint       
  // 指向底层循环数组的指针
  // 只针对有缓冲的channel
	buf      unsafe.Pointer 

  // chan 中元素大小
	elemsize uint16
  // chan 是否被关闭
	closed   uint32
   // chan中元素类型
	elemtype *_type 
  
  // 已发送元素在循环数组中的索引 
	sendx    uint    // send index
  // 已接收元素在循环数组中的索引
	recvx    uint   //receive index
  // 等待接收的goroutine队列
	recvq    waitq    //list of recv waiters
   // 等待发送的goroutine队列
	sendq    waitq  // list of send waiters

  // 保护hchan中所有字段
	lock mutex
}

type waitq struct {
	first *sudog
	last  *sudog
}
重点讲解几个字段:
  • buf: 指向底层循环数组,只有缓冲型的channel才有
  • sendxrecvx均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。
  • sendqrecvq分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。
  • waitqsudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装
type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.

	g *g

	next *sudog
	prev *sudog
	elem unsafe.Pointer // data element (may point to stack)
  //......
}
  • lock 用来保证每个读 channel 或写 channel 的操作都是原子的。
例如,创建一个容量为 6 的,元素为 int 型的 channel 数据结构如下 :
notion image
 

创建

我们知道,通道有两个方向,发送和接收。理论上来说,我们可以创建一个只发送或只接收的通道,但是这种通道创建出来后,怎么使用呢?一个只能发的通道,怎么接收呢?同样,一个只能收的通道,如何向其发送数据呢?
一般而言,使用 make 创建一个能收能发的通道:
// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)
通过汇编分析,我们知道,最终创建 chan 的函数是 makechan
// 通用创建方法
func makechan(t *chantype, size int) *hchan
// 类型为 int64 的进行特殊处理
func makechan64(t *chantype, size int64) *hchan
从函数原型来看,创建的 chan 是一个指针。所以我们能在函数间直接传递 channel,而不用传递 channel 的指针。具体来看下代码:
func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// 省略了检查 channel size,align 的代码
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))

	var c *hchan

	switch {
	// 如果元素类型不含指针 或者 size 大小为 0(无缓冲类型)
	// 只进行一次内存分配
	case mem == 0:
		// 如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素

		// 只分配 "hchan 结构体大小 + 元素大小*个数" 的内存
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		// 1. 非缓冲型的,buf 没用,直接指向 chan 起始地址处
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// 如果是缓冲型 channel 且元素大小不等于 0(大小等于 0的元素类型:struct{})
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		// 2. 缓冲型的,能进入到这里,说明元素无指针且元素类型为 struct{},也无影响
		// 因为只会用到接收和发送游标,不会真正拷贝东西到 c.buf 处(这会覆盖 chan的内容)
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	// 循环数组长度
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)
	// 返回 hchan 指针

	return c
}
创建 channel 的逻辑主要分为三大块:
  • 当前 channel 不存在缓冲区,也就是元素大小为 0 的情况下,就会调用 mallocgc 方法分配一段连续的内存空间。
  • 当前 channel 存储的类型存在指针引用,就会连同 hchan 和底层数组同时分配一段连续的内存空间。
  • 通用情况,默认分配相匹配的连续内存空间。
需要注意到一块特殊点,那就是 channel 的创建都是调用的 mallocgc 方法,也就是 channel 都是创建在上的。因此 channel 是会被 GC 回收的,自然也不总是需要 close 方法来进行显示关闭了。
notion image

发送

channel发送数据示例
go func() {
    ch <- "Hello World"
}()
发送操作转化为 chansend1 函数并指向实际逻辑chansend,直接上源码
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}

基础前置判断处理

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// 如果 channel 是 nil
	if c == nil {
		// 不能阻塞,直接返回 false,表示未发送成功
		if !block {
			return false
		}
		// 当前 goroutine 被挂起,发生panic
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	// 对于不阻塞的 send,快速检测失败场景
	// 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
	// 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
	// 2. channel 是缓冲型的,但循环数组已经装满了元素
	if !block && c.closed == 0 && full(c) {
		return false
	}
  // .....
}

func full(c *hchan) bool {
	// c.dataqsiz is immutable (never written after the channel is created)
	// so it is safe to read at any time during channel operation.
	if c.dataqsiz == 0 {
		// Assumes that a pointer read is relaxed-atomic.
		return c.recvq.first == nil
	}
	// Assumes that a uint read is relaxed-atomic.
	return c.qcount == c.dataqsiz
}

上面的代码注释地比较详细了,我们来详细看看。
  • 如果检测到 channel 是空的,逻辑上向nil channel发送数据,调用gopark发生panic。
  • 对于不阻塞的发送操作,如果 channel 未关闭并且没有多余的缓冲空间(说明:a. channel 是非缓冲型的,且等待接收队列里没有 goroutine;b. channel 是缓冲型的,但循环数组已经装满了元素)
没有多余缓冲空间的失败场景:
  • 若非阻塞未关闭,同时底层数据dataqsiz大小为0(缓冲区无元素)
  • 若是qcountdataqsiz大小相同(缓冲区已满)

上互斥锁

在完成channel的前置判断后,在发送数据前,channel会进行上锁
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
 // ....
	// 锁住 channel,并发安全
	lock(&c.lock)
// ....
}
上锁后就能保住并发安全。

直接发送

在正式开发发送前,加锁之后,会对channel进行一次状态判断
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...  
// 如果 channel 关闭了
	if c.closed != 0 {
		unlock(&c.lock)
		// 直接 panic
		panic(plainError("send on closed channel"))
	}

	// 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
  // ...
}
这种情况是最为基础的,也就是当前 channel 有正在阻塞等待的接收方,那么只需要直接发送就可以了。

缓冲发送

判断 channel 缓冲区中是否还有空间:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
	// 对于缓冲型的 channel,如果还有缓冲空间
	if c.qcount < c.dataqsiz {
		// qp 指向 buf 的 sendx 位置
		qp := chanbuf(c, c.sendx)
		// 将数据从 ep 处拷贝到 qp
		typedmemmove(c.elemtype, qp, ep)
		// 发送游标值加 1
		c.sendx++
		// 如果发送游标值等于容量值,游标值归 0
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		// 缓冲区的元素数量加一
		c.qcount++
		unlock(&c.lock)
		return true
	}

	if !block {
		unlock(&c.lock)
		return false
	}
  // ...
}

// chanbuf(c, i) is pointer to the i'th slot in the buffer.
// 返回循环队列里第 i 个元素的地址处
func chanbuf(c *hchan, i uint) unsafe.Pointer {
	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
会对缓冲区进行判定(qcount 和 dataqsiz 字段),以此识别缓冲区的剩余空间。紧接进行如下操作:
  • 调用 chanbuf 方法,以此获得底层缓冲数据中位于 sendx 索引的元素指针值。
  • 调用 typedmemmove 方法,将所需发送的数据拷贝到缓冲区中。
  • 数据拷贝后,对 sendx 索引自行自增 1。同时若 sendx 与 dataqsiz 大小一致,则归 0(环形队列)。
  • 自增完成后,队列总数同时自增 1。解锁互斥锁,返回结果。
至此针对缓冲区的数据操作完成。但若没有走进缓冲区处理的逻辑,则会判断当前是否阻塞 channel,若为非阻塞,将会解锁并直接返回失败。
配合图示如下:
notion image

堵塞发送

最后会进入阻塞等待发送的过程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...

	// channel 满了,发送方会被阻塞。接下来会构造一个 sudog

	// 获取当前 goroutine 的指针
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	// 当前 goroutine 进入发送等待队列
	c.sendq.enqueue(mysg)

	atomic.Store8(&gp.parkingOnChan, 1)
	// 当前 goroutine 被挂起
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	KeepAlive(ep)
// ...
}
  • 调用 getg 方法获取当前 goroutine 的指针,用于后续发送数据。
  • 调用 acquireSudog 方法获取 sudog 结构体,并设置当前 sudog 具体的待发送数据信息和状态。
  • 调用 c.sendq.enqueue 方法将刚刚所获取的 sudog 加入待发送的等待队列。
  • 调用 gopark 方法挂起当前 goroutine(会记录执行位置),状态为 waitReasonChanSend,阻塞等待 channel。
  • 调用 KeepAlive 方法保证待发送的数据值是活跃状态,也就是分配在堆上,避免被 GC 回收。
notion image
 

等待唤醒

在当前goroutine被挂起后,其将会在channel能后发送数据后被唤醒

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...

	// 从这里开始被唤醒了(channel 有机会可以发送了)
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	// 去掉 mysg 上绑定的 channel
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}
唤醒 goroutine(调度器在停止 g 时会记录运行线程和方法内执行的位置)并完成 channel 的阻塞数据发送动作后。进行基本的参数检查,确保是符合要求的(纵深防御),接着开始取消 mysg 上的 channel 绑定和 sudog 的释放。

发送函数

调用 send 函数将元素直接从发送者的栈拷贝到接收者的栈,关键操作由 sendDirect 函数完成。不同 goroutine 的栈是各自独有的。为了不出问题,写的过程中增加了写屏障,保证正确地完成写操作。这样做的好处是减少了一次内存 copy:不用先拷贝到 channel 的 buf,直接由发送者到接收者,效率得以提高
// send 函数处理向一个空的 channel 发送操作
// ep 指向被发送的元素,会被直接拷贝到接收的 goroutine
// 之后,接收的 goroutine 会被唤醒
// c 必须是空的(因为等待队列里有 goroutine,肯定是空的)
// c 必须被上锁,发送操作执行完后,会使用 unlockf 函数解锁
// sg 必须已经从等待队列里取出来了
// ep 必须是非空,并且它指向堆或调用者的栈
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
	if sg.elem != nil {
// 直接拷贝内存(从发送者到接收者)
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
// sudog 上绑定的 goroutine
	gp := sg.g
// 解锁
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
// 唤醒接收的 goroutine. skip 和打印栈相关,暂时不理会
	goready(gp, skip+1)
}


// 向一个非缓冲型的 channel 发送数据、从一个无元素的(非缓冲型或缓冲型但空)的 channel
// 接收数据,都会导致一个 goroutine 直接操作另一个 goroutine 的栈
// 由于 GC 假设对栈的写操作只能发生在 goroutine 正在运行中并且由当前 goroutine 来写
// 所以这里实际上违反了这个假设。可能会造成一些问题,所以需要用到写屏障来规避
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src 在当前 goroutine 的栈上,dst 是另一个 goroutine 的栈    
// 直接进行内存"搬迁"
// 如果目标地址的栈发生了栈收缩,当我们读出了 sg.elem 后
// 就不能修改真正的 dst 位置的值了
// 因此需要在读和写之前加上一个屏障
	dst := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	// No need for cgo write barrier checks because dst is always
	// Go memory.
	memmove(dst, src, t.size)
}
  • 调用 sendDirect 方法将待发送的数据直接拷贝到待接收变量的内存地址(执行栈)。
    • 例如:msg := <-ch 语句,也就是将数据从 ch 直接拷贝到了 msg 的内存地址。
  • 调用 sg.g 属性, 从 sudog 中获取等待接收数据的 goroutine,并传递后续唤醒所需的参数。
  • 调用 goready 方法唤醒需接收数据的 goroutine,期望从 _Gwaiting 状态调度为 _Grunnable

发送调度示例

好了,看完源码。我们接着来分析例子:

func goroutineA(a <-chan int) {
	val := <-a
	fmt.Println("G1 received data:", val)
	return
}

func goroutineB(b <-chan int) {
	val := <-b
	fmt.Println("G2 received data:", val)
	return
}

func main() {
	ch := make(chan int)
	go goroutineA(ch)
	go goroutineB(ch)
	ch <- 3
	time.Sleep(time.Second)
}
在发送小节里我们说到 G1 和 G2 现在被挂起来了,等待 sender 的解救。在第 17 行,主协程向 ch 发送了一个元素 3,来看下接下来会发生什么。
根据前面源码分析的结果,我们知道,sender 发现 ch 的 recvq 里有 receiver 在等待着接收,就会出队一个 sudog,把 recvq 里 first 指针的 sudo “推举”出来了,并将其加入到 P 的可运行 goroutine 队列中。
然后,sender 把发送元素拷贝到 sudog 的 elem 地址处,最后会调用 goready 将 G1 唤醒,状态变为 runnable。
notion image
当调度器光顾 G1 时,将 G1 变成 running 状态,执行 goroutineA 接下来的代码。G 表示其他可能有的 goroutine。
这里其实涉及到一个协程写另一个协程栈的操作。有两个 receiver 在 channel 的一边虎视眈眈地等着,这时 channel 另一边来了一个 sender 准备向 channel 发送数据,为了高效,用不着通过 channel 的 buf “中转”一次,直接从源地址把数据 copy 到目的地址就可以了,效率高啊!
notion image
上图是一个示意图, 3 会被拷贝到 G1 栈上的某个位置,也就是 val 的地址处,保存在 elem 字段。

接收

在继续分析前面小节的例子前,我们先来看一下接收相关的源码。在清楚了接收的具体过程之后,也就能轻松理解具体的例子了。
接收操作有两种写法:
  • 一种带 "ok",反应 channel 是否关闭;
  • 一种不带 "ok",这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。
// msg := <-ch
func chanrecv1(c *hchan, elem 	unsafe	.	Pointer	) {
		chanrecv(c, elem, 	true	)
}
		
// msg, ok := <- ch
func chanrecv2(c *hchan, elem 	unsafe	.	Pointer	) (received 	bool	) {	
		_, received = chanrecv(c, elem, 	true	)	
		return	
}
chanrecv1函数处理不带 "ok" 的情形, chanrecv2 则通过返回 "received" 这个字段来反应 channel 是否被关闭。接收值则比较特殊,会“放到”参数 elem 所指向的地址了。如果代码里忽略了接收值,这里的 elem 为 nil。无论如何,最终转向了 chanrecv 函数:
// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
}

前置判断处理

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// 如果是一个 nil 的 channel
	if c == nil {
		// 如果不阻塞,直接返回 (false, false)
		if !block {
			return
		}
		// 否则,接收一个 nil 的 channel,goroutine 挂起
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		// 不会执行到这里
		throw("unreachable")
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	// 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
	// 当我们观察到 channel 没准备好接收:
	// 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
	// 2. 缓冲型,但 buf 里没有元素
	// 之后,又观察到 closed == 0,即 channel 未关闭。
	// 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
	// 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
	if !block && empty(c) {
		if atomic.Load(&c.closed) == 0 {
			return
		}

		if empty(c) {
			// The channel is irreversibly closed and empty.
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}
// 加锁
	lock(&c.lock)

	// channel 已关闭,并且循环数组 buf 里没有元素
	// 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
	// 也就是说即使是关闭状态,但在缓冲型的 channel,
	// buf 里有元素的情况下还能接收到元素
	if c.closed != 0 && c.qcount == 0 {
		// 解锁
		unlock(&c.lock)
		if ep != nil {
			// 从一个已关闭的 channel 执行接收操作,且未忽略返回值
			// 那么接收的值将是一个该类型的零值
			// typedmemclr 根据类型清理相应地址的内存
			typedmemclr(c.elemtype, ep)
		}
		// 从一个已关闭的 channel 接收,selected 会返回true
		return true, false
	}
// ...
}
一开始时 chanrecv 方法会判断其是否为 nil channel。
场景如下:
  • 若 channel 是 nil channel,且为阻塞接收则调用 gopark 方法挂起当前 goroutine。
  • 若 channel 是非阻塞模式,则直接返回。
而接下来对于非阻塞模式的 channel 会进行快速失败检查,检测 channel 是否已经准备好接收。其分以下几种情况:
  • 无缓冲区:循环队列为 0 及等待队列 sendq 内没有 goroutine 正在等待。
  • 有缓冲区:缓冲区数组为空。
随后会对 channel 的 closed 状态进行判断,因为 channel 是无法重复打开的,需要确定当前 channel 是否为未关闭状态。再确定接收失败,返回。
但若是 channel 已经关闭且不存在缓存数据了,则会清理 ep 指针中的数据并返回。

直接接收

当发现 channel 上有正在阻塞等待的发送方时,则直接进行接收:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...


	// 等待发送队列里有 goroutine 存在,说明 buf 是满的
	// 这有可能是:
	// 1. 非缓冲型的 channel
	// 2. 缓冲型的 channel,但 buf 满了
	// 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
	// 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

// ...

}

缓冲接收

当发现 channel 的缓冲区中有元素时:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
 // ...
	// 缓冲型,buf 里有元素,可以正常接收
	if c.qcount > 0 {
		// 直接从循环数组里找到要接收的元素
		qp := chanbuf(c, c.recvx)
		// 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// 清理掉循环数组里相应位置的值
		typedmemclr(c.elemtype, qp)
		// 接收游标向前移动
		c.recvx++
		// 接收游标归零
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		// buf 数组里的元素个数减 1
		c.qcount--
		// 解锁
		unlock(&c.lock)
		return true, true
	}

	if !block {
		// 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
		unlock(&c.lock)
		return false, false
	}

// ..

}
将会调用 chanbuf 方法根据 recvx 的索引位置取出数据,找到要接收的元素进行处理。若所接收到的数据和所传入的变量均不为空,则会调用 typedmemmove 方法将缓冲区中的数据拷贝到所传入的变量中。
最后数据拷贝完毕后,进行各索引项和队列总数的自增增减,并调用 typedmemclr 方法进行内存数据的清扫。

阻塞接收

当发现 channel 上既没有待发送的 goroutine,缓冲区也没有数据时。将会进入到最后一个阶段阻塞接收:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  // ..
	// 接下来就是要被阻塞的情况了
	// 构造一个 sudog
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// 待接收数据的地址保存下来
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	// 进入channel 的等待接收队列
	c.recvq.enqueue(mysg)

	atomic.Store8(&gp.parkingOnChan, 1)
	// 将当前 goroutine 挂起
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

 //...
}
这一块接收逻辑与发送也基本类似,主体就是获取当前 goroutine,构建 sudog 结构保存当前待接收数据(发送方)的地址信息,并将 sudog 加入等待接收队列。最后调用 gopark  方法挂起当前 goroutine,等待唤醒。

等待唤醒

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
	// 被唤醒了,接着从这里继续执行一些扫尾工作
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
// ...
}
被唤醒后,将恢复现场,回到对应的执行点,完成最后的扫尾工作。

接收函数

于是,调用 recv 函数:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	// 如果是非缓冲型的 channel
	if c.dataqsiz == 0 {
		// 未忽略接收的数据
		if ep != nil {
			// 直接拷贝数据,从 sender goroutine -> receiver goroutine
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// 缓冲型的 channel,但 buf 已满。
		// 将循环数组 buf 队首的元素拷贝到接收数据的地址
		// 将发送者的数据入队。实际上这时 revx 和 sendx 值相等
		// 找到接收游标
		qp := chanbuf(c, c.recvx)
		// 将接收游标处的数据拷贝给接收者
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// 将发送者数据拷贝到 buf
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	// 解锁
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	// 唤醒发送的 goroutine。需要等到调度器的光临
	goready(gp, skip+1)
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	// dst is on our stack or the heap, src is on another stack.
	// The channel is locked, so src will not move during this
	// operation.
	src := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}
该方法在接收上分为两种情况,分别是直接接收缓冲接收
  • 直接接收(不存在缓冲区):
    • 调用 recvDirect 方法,其作用与 sendDirect 方法相对,会直接从发送方的 goroutine 调用栈中将数据拷贝过来到接收方的 goroutine。
  • 缓冲接收(存在缓冲区):
    • 调用 chanbuf 方法,根据 recvx 索引的位置读取缓冲区元素,并将其拷贝到接收方的内存地址。
    • 拷贝完毕后,对 sendx 和 recvx 索引位置进行调整。
最后还是常规的 goroutine 调度动作,会调用 goready 方法来唤醒当前所处理的 sudog 的对应 goroutine,其状态改成RUNNABLE。那么在下一轮调度时,既然已经接收了数据,自然发送方也就会被唤醒。
  • 然后,如果 channel 的 buf 里还有数据,说明可以比较正常地接收。注意,这里,即使是在 channel 已经关闭的情况下,也是可以走到这里的。这一步比较简单,正常地将 buf 里接收游标处的数据拷贝到接收数据的地址。
  • 到了最后一步,走到这里来的情形是要阻塞的。当然,如果 block 传进来的值是 false,那就不阻塞,直接返回就好了。
先构造一个 sudog,接着就是保存各种值了。注意,这里会将接收数据的地址存储到了 elem 字段,当被唤醒时,接收到的数据就会保存到这个字段指向的地址。然后将 sudog 添加到 channel 的 recvq 队列里。调用 goparkunlock 函数将 goroutine 挂起。

接受调度示例

我们继续之前的例子。
func goroutineA(a <-chan int) {
	val := <-a
	fmt.Println("G1 received data:", val)
	return
}

func goroutineB(b <-chan int) {
	val := <-b
	fmt.Println("G2 received data:", val)
	return
}

func main() {
	ch := make(chan int)
	go goroutineA(ch)
	go goroutineB(ch)
	ch <- 3
	time.Sleep(time.Second)
}
前面说到第 14 行,创建了一个非缓冲型的 channel,接着,第 15、16 行分别创建了一个 goroutine,各自执行了一个接收操作。通过前面的源码分析,我们知道,这两个 goroutine (后面称为 G1 和 G2 好了)都会被阻塞在接收操作。G1 和 G2 会挂在 channel 的 recq 队列中,形成一个双向循环链表。
在程序的 17 行之前,chan 的整体数据结构如下:
notion image
buf 指向一个长度为 0 的数组,qcount 为 0,表示 channel 中没有元素。重点关注 recvqsendq,它们是 waitq 结构体,而 waitq 实际上就是一个双向链表,链表的元素是 sudog,里面包含 g 字段, g 表示一个 goroutine,所以 sudog 可以看成一个 goroutine。recvq 存储那些尝试读取 channel 但被阻塞的 goroutine,sendq 则存储那些尝试写入 channel,但被阻塞的 goroutine。
此时,我们可以看到,recvq 里挂了两个 goroutine,也就是前面启动的 G1 和 G2。因为没有 goroutine 接收,而 channel 又是无缓冲类型,所以 G1 和 G2 被阻塞。sendq 没有被阻塞的 goroutine。
recvq 的数据结构如下。这里直接引用文章中的一幅图,用了三维元素,画得很好:
notion image
再从整体上来看一下 chan 此时的状态:
notion image
G1 和 G2 被挂起了,状态是 WAITING。关于 goroutine 调度器这块不是今天的重点,当然后面肯定会写相关的文章。这里先简单说下,goroutine 是用户态的协程,由 Go runtime 进行管理,作为对比,内核线程由 OS 进行管理。Goroutine 更轻量,因此我们可以轻松创建数万 goroutine。
一个内核线程可以管理多个 goroutine,当其中一个 goroutine 阻塞时,内核线程可以调度其他的 goroutine 来运行,内核线程本身不会阻塞。这就是通常我们说的 M:N模型:
notion image
M:N 模型通常由三部分构成:M、P、G。M 是内核线程,负责运行 goroutine;P 是 context,保存 goroutine 运行所需要的上下文,它还维护了可运行(runnable)的 goroutine 列表;G 则是待运行的 goroutine。M 和 P 是 G 运行的基础。
notion image
继续回到例子。假设我们只有一个 M,当 G1( go goroutineA(ch)) 运行到 val:=<-a 时,它由本来的 running 状态变成了 waiting 状态(调用了 gopark 之后的结果):
notion image
G1 脱离与 M 的关系,但调度器可不会让 M 闲着,所以会接着调度另一个 goroutine 来运行:
notion image
G2 也是同样的遭遇。现在 G1 和 G2 都被挂起了,等待着一个 sender 往 channel 里发送数据,才能得到解救。

关闭

关闭某个 channel,会执行函数close,实际逻辑是调用 closechan,具体看源码
close 函数先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒。

前置判断处理

func closechan(c *hchan) {
	// 关闭一个 nil channel,panic
	if c == nil {
		panic(plainError("close of nil channel"))
	}
	// 上锁
	lock(&c.lock)
	// 如果 channel 已经关闭
	if c.closed != 0 {
		unlock(&c.lock)
		// panic
		panic(plainError("close of closed channel"))
	}

	// 修改关闭状态
	c.closed = 1
// ....
}
基本检查和关闭标志设置,保证 channel 不为 nil 和未关闭,保证边界。

释放接收方

在完成了异常边界判断和标志设置后,会将接受者的 sudog 等待队列(recvq)加入到待清除队列 glist 中:
func closechan(c *hchan) {

	var glist gList

	// release all readers
	// 将 channel 所有等待接收队列的里 sudog 释放
	for {
		// 从接收队列里出队一个 sudog
		sg := c.recvq.dequeue()
		// 出队完毕,跳出循环
		if sg == nil {
			break
		}
		// 如果 elem 不为空,说明此 receiver 未忽略接收数据
		// 给它赋一个相应类型的零值
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}

		// 取出 goroutine
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		// 相连,形成链表
		glist.push(gp)
	}
// ....
}
所取出并加入的 goroutine 状态需要均为 _Gwaiting,以保证后续的新一轮调度。

释放发送方

同样,与释放接收方一样。会将发送方也加入到到待清除队列 glist 中:
func closechan(c *hchan) {
  // ...
	// release all writers (they will panic)
	// 将 channel 等待发送队列里的 sudog 释放
	// 如果存在,这些 goroutine 将会 panic
	for {
		// 从发送队列里出队一个 sudog
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		// 发送者会 panic
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		// 形成链表
		glist.push(gp)
	}

	// 解锁
	unlock(&c.lock)
  // ...
}

协程调度

将所有 glist 中的 goroutine 状态从 _Gwaiting设置为 _Grunnable状态,等待调度器的调度:
func closechan(c *hchan) {
  // ...
	// Ready all Gs now that we've dropped the channel lock.
	// 遍历链表
	for !glist.empty() {
		// 取最后一个
		gp := glist.pop()
		gp.schedlink = 0
		// 唤醒相应 goroutine
		goready(gp, 3)
	}
}
后续所有的 goroutine 允许被重新调度后。若原本还在被动阻塞的发送方或接收方,将重获自由,后续该干嘛就去干嘛了,再跑回其所属的应用流程。

无缓冲流程图

notion image

有缓冲流程图

notion image

使用场景

  • 传递的数据时候,把数据传递给下一个goroutine处理
  • 分发任务单元,做一个应用池的时候,前面把任务包装号,扔给其中一个worker就可以,worker用channel实现任务队列,它从channel中取任务就ok
  • 另外交流异步的结果,可以利用channel进行传递
  • 可以进行复杂编排功能
 

Loading Comments...