Chanel Base
chan的实现原理
0. 预备知识
0.1 Unix管道
在Unix以及类Unix操作系统中,管道(pipeline)是将标准输入输出链接起来的进程,其中每一个进程的输出被直接作为下一个进程的输入,也就是说管道其实是进程间的通信机制。
管道的典型用途是为两个不同进程(一个父进程,另一个是子进程)间进行通信的手段。首先会根据父进程fork(unix创建子进程的函数)一个子进程(也就是创建一个父进程的副本),由于创建管道会返回两个文件描述符,一个是读描述符,另一个是写描述符。在进行fork的时候,进程空间会进行复制,也就是说该文件描述符也会被复制,所有两个进程都有相同的读写文件描述符。

当父进程关闭读出端,子进程关闭同一管道的写入端。这样就在父子进程间提供了一个单向数据流。

比如在执行linux命令时:
ls -l | grep "chan"
首先父进程就是shell,它首先会创建管道(pipe),
接着创建ls 子进程,之后创建grep子进程,也就是fork两个子进程,这样两个子进程就会包含管道返回的两个文件描述符,之后将ls -l所执行的命令标准输出写入到管道之中,然后grep "chan"则会从管道读取数据并处理,最后就是返回结果。
0.1.1 管道在linux下的具体实现
管道在Linux系统具体实现是缓冲区buffer而不是队列。
管道会为读端和写端分别分配内核缓冲区, 写进程将数据写入写端缓冲区,读进程从读端缓冲区读取数据。内核将写端缓冲区的数据进行复制到读端缓冲区,然后内核也会维护读写两端的位置指针。
同步机制:
当写端缓冲区满的时候,写进程会被阻塞掉,直到读进程读取了部分数据腾出空间。
当读缓冲区为空的时候,读进程会被阻塞,直到写进程写入了新数据。
管道的数据采用FIFO的数据处理模式,也就是先写入的数据先被读取。
1. chanel
在go语言中,一个进程可能会有多个goroutine,多个goroutine之间的通信可以使用chanel或者对于共享内存进行加锁。
1.1 chan的数据结构
在go SDK 1.20.3 中的runtime/chan.gp文件中就定义了chan的底层数据结构hchan。
type hchan struct{
qcount uint // 表示当前队列中的剩余元素
dataqsiz uint // 表示循环队列的大小,即可以存放元素的个数
buf unsafe.Pointer // 指向一个dataqsiz大小的数组,也就是环形队列的指针
elemsize uint16 // 每个元素的大小
closed uint32 // 标记关闭状态
elemtype *_type // 元素类型
sendx uint // 队列下标,指元素写入时存放到队列中的位置
recvx uint // 队列下标,指元素从队列的该位置读出
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
lcok mutex // 互斥锁,chan不允许并发读写
}
type waitq struct {
first *sudog
last *sudog
}
waitg类型是一个队列,其包含队列的第一个元素指针和最后一个元素指针。
waitq实现了两个方法分别是: enqueue, dequeue。
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
sudog:用于表示一个在等待列表的g(表示goroutine 的数据结构)。
思考: 为什么需要sudog而不是直接使用g?
go SDK中作者表示:
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
总结:sudog的存在是必须的,因为goroutine和同步对象之间的关系是多对多的。一个goroutine可以在多个等待列表当中,因此多个sudog表示同一个goroutine,并且多个goroutine可能等待同一个同步对象,所以有个同步对象有多个sudog。
1.1.1 环形队列
chan内部实现了一个环形队列的缓冲区,也就是buf字段,队列的长度是在创建chan的时候指定,也就是使用make函数创建的时候指定的,如果不指定长度,则没有这个环形缓冲区,也就是无缓冲chan。

1.1.2 等待队列
一个goroutine从chanel读数据,如果chanel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞。
一个goroutine从chanel写数据,如果chanel缓冲区满了或者没有缓冲区,当前goroutine会被阻塞。
被阻塞的goroutine会被存储在等待队列当中,也就是recvq, sendq当中。
sendq队列用于存储那些试图向chanel发送数据但被阻塞的goroutine。
recvq队列用于存储那些试图向chanel读取数据但被阻塞的goroutine。
思考: 当多个读goroutine被阻塞,然后存储在recvq等待队列当中,此时有一个写goroutine写入数据,那么唤醒所有等待队列的goroutine还是唤醒部分?
答: chanel不会唤醒所有等待队列的goroutine,会取出一个goroutine处理数据,因为chanel的设计是尽可能高效地完成数据的传递,如果每次读取都唤醒所有的recvq中的goroutine会导致大量无谓的上下文切换,降低性能。选择队列头部的goroutine来传输数据,可以保证FIFO的顺序,避免饥饿问题的发生。
1.1.3 类型信息
一个chanel只能传递一种类型的值,类型信息存储在hchan当中【elemtype. elemsize】
1.1.4 锁
一个chanel同时仅允许被一个goroutine读写。这也是为什么chanel是线程安全的数据类型。
1.2 向chanel中写数据
code
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
向一个chanel写入数据的具体实现过程:
检查
chanel的状态首先检查
chanel的状态,包括是否已关闭、是否缓冲区满了。尝试直接发送
如果
chanel不满且有等待接收的goroutine,则会直接将数据发送给等待的goroutine,并唤醒该goroutine。如何发送数据
if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }根据源码我们可以看出如果
recvq等待队列中有读goroutine(不为空的情况下),从该队列出队获取到一个goroutine然后发送数据。由于recvq不为空,也就是说chanel中的buf环形队列中没有数据,此时就直接发送数据就可以了。// send processes a send operation on an empty channel c. // The value ep sent by the sender is copied to the receiver sg. // The receiver is then woken up to go on its merry way. // Channel c must be empty and locked. send unlocks c with unlockf. // sg must already be dequeued from c. // ep must be non-nil and point to the heap or the caller's stack. func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // src is on our stack, dst is a slot on another stack. // Once we read sg.elem out of sg, it will no longer // be updated if the destination's stack gets copied (shrunk). // So make sure that no preemption points can happen between read & use. dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) // No need for cgo write barrier checks because dst is always // Go memory. memmove(dst, src, t.size) }未完待续….