Channel

数据结构

type hchan struct {
qcount   uint // total data in the queue 数据总长度
dataqsiz uint // size of the circular queue 底层数组长度
buf      unsafe.Pointer // points to an array of dataqsiz elements 底层数组指针
elemsize uint16         //元素大小
closed   uint32         //是否关闭
timer    *timer // timer feeding this chan
elemtype *_type // element type 元素类型
sendx    uint   // send index 发送坐标
recvx    uint   // receive index 接受坐标
recvq    waitq  // list of recv waiters 接受队列
sendq    waitq  // list of send waiters 发送队列

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //互斥锁
}

发送操作

不阻塞:

1、通道ch recvq已经有G在等待 2、ch有缓冲并且缓冲没有用尽

阻塞

1、通道ch为nil 2、通道ch无缓冲且recvq为空 3、通道ch有缓冲且用尽

环形缓冲区

接收操作

不阻塞:

1、通道ch的 sendq已经有G在等待 2、ch sengq为空但是通道ch有缓冲并且缓冲没有用尽

阻塞

1、通道ch为nil 2、通道ch无缓冲且sendq为空 3、通道ch有缓冲且用尽

channel的使用场景
  • 停止信号
  • 定时任务

    select { 
    case <-time.After(100 * time.Millisecond): 
    case <-s.stopc: 
    return false 
    }
    
  • 解耦生产和消费方

package main

import (
    "fmt"
    "time"
)

func main() {
    taskCh := make(chan int, 100)
    go worker(taskCh)
    // 阻塞任务
    for i := 0; i < 10; i++ {
        taskCh <- i
    }
    // 等待 1 小时 
    select {
    case <-time.After(time.Hour):
    }
}
func worker(taskCh <-chan int) {
    const N = 5
    // 启动 5 个工作协程
    for i := 0; i < N; i++ {
        go func(id int) {
            for {
                task := <-taskCh
                fmt.Printf("finish task: %d by worker %d\n", task, id)
                time.Sleep(time.Second)
            }
        }(i)
    }
}
  • 控制并发数
package main

var token = make(chan int, 3)

func main() {
    // ………… 
    for _, w := range work {
        go func() {
            token <- 1
            w()
            <-token
        }()
    }
    // ………… 
}
通道操作总结

results matching ""

    No results matching ""