Channel
数据结构
type hchan struct {
qcount uint // total data in the queue 数据总长度
dataqsiz uint // size of the circular queue 底层数组长度
buf unsafe.Pointer // points to an array of dataqsiz elements 底层数组指针
elemsize uint16 //元素大小
closed uint32 //是否关闭
timer *timer // timer feeding this chan
elemtype *_type // element type 元素类型
sendx uint // send index 发送坐标
recvx uint // receive index 接受坐标
recvq waitq // list of recv waiters 接受队列
sendq waitq // list of send waiters 发送队列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //互斥锁
}
发送操作
不阻塞:
1、通道ch recvq已经有G在等待 2、ch有缓冲并且缓冲没有用尽
阻塞
1、通道ch为nil 2、通道ch无缓冲且recvq为空 3、通道ch有缓冲且用尽
环形缓冲区
接收操作
不阻塞:
1、通道ch的 sendq已经有G在等待 2、ch sengq为空但是通道ch有缓冲并且缓冲没有用尽
阻塞
1、通道ch为nil 2、通道ch无缓冲且sendq为空 3、通道ch有缓冲且用尽
channel的使用场景
- 停止信号
定时任务
select { case <-time.After(100 * time.Millisecond): case <-s.stopc: return false }
解耦生产和消费方
package main
import (
"fmt"
"time"
)
func main() {
taskCh := make(chan int, 100)
go worker(taskCh)
// 阻塞任务
for i := 0; i < 10; i++ {
taskCh <- i
}
// 等待 1 小时
select {
case <-time.After(time.Hour):
}
}
func worker(taskCh <-chan int) {
const N = 5
// 启动 5 个工作协程
for i := 0; i < N; i++ {
go func(id int) {
for {
task := <-taskCh
fmt.Printf("finish task: %d by worker %d\n", task, id)
time.Sleep(time.Second)
}
}(i)
}
}
- 控制并发数
package main
var token = make(chan int, 3)
func main() {
// …………
for _, w := range work {
go func() {
token <- 1
w()
<-token
}()
}
// …………
}