Go Channel
目录
如何使用
- channel在<-左边 表示向channel发送数据
- channel在<-右边 表示从channel接收数据
- close(channelName) 关闭一个channel
channel := make(chan string, 2)
//发送数据: 写
channel <- "struct"
//接收数据: 读
data := <-channel
fmt.Println(data)
close(channel)
Channel的关闭
- 关闭一个未初始化(nil) 的 channel 会产生 panic
- 重复关闭同一个 channel 会产生 panic
- 向一个已关闭的 channel 中发送消息会产生 panic
- 从一个已关闭的 channel 中读取消息永远不会阻塞,并且会返回一个为 false 状态,可以用它来判断 channel 是否关闭,close操作是对写入的关闭,但仍然可以读取,若消息均已读出,则会读到类型的初始值
func SendMessage(channel chan string) {
go func(channel chan string) {
channel <- "hello"
close(channel)
fmt.Println("channel is closed.")
}(channel)
}
func channalFunc2() {
channel := make(chan string, 5)
go SendMessage(channel)
for {
time.Sleep(time.Second)
chStr, ok := <-channel
if !ok {
fmt.Println("channel is close!!!!!!.")
break
} else {
fmt.Printf("receive %s\n", chStr)
}
}
}
缓冲区
- make创建通道时,指定通道的大小时,称为有缓冲通道,反之无缓冲区
- 无缓冲区或者缓冲区用完,写入一次,就要等待对方读取一次,否则无法继续写入阻塞住,同理读取不出来也会阻塞住
ch := make(chan int, 2)
ch <- 1
ch <- 2
// ch <- 3 //阻塞
a := <- ch
fmt.Println(a)
- 可以用len函数查看channel的已用大小, 用cap查看channel的缓存大小
ch := make(chan int, 2)
ch <- 1
fmt.Println(len(ch)) //1
fmt.Println(cap(ch)) //2
单向通道
- 为了限制channel滥用,禁止其进行读取或者写入操作,让函数具有更高的单一原则,封装性
func counter(out chan<- int) {
for x := 0; x < 10; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func channalFunc3() {
naturals := make(chan int)
squares := make(chan int)
//将读写函数分离
//写 chan<- , 读 <-chan
go counter(naturals) //写入
go squarer(squares, naturals) //将刚才写入的变成只读的,传参进去, 中间转换层
printer(squares) //只读
}
作用
- 同步: 依靠阻塞的特性 做多个goroutine之间的锁
var (
sema = make(chan struct{}, 1)
rece2 = 0
)
func raceFunc2() int {
sema <- struct{}{}
rece2++
v := rece2
<-sema
return v
}
go raceFunc2()
go raceFunc2()
- 定时器
fmt.Println(time.Now())
timer := time.NewTimer(time.Second * 2)
<-timer.C
fmt.Println(time.Now())
//输出: 差两秒
// 2019-06-24 16:03:34.011947 +0800 CST m=+0.000201381
// 2019-06-24 16:03:36.015244 +0800 CST m=+2.003571991
//延迟执行
time.AfterFunc(time.Second*2, func() {
fmt.Println(time.Now())
})
//定时器,每隔1秒执行
ticker := time.NewTicker(time.Second)
go func() {
for tick := range ticker.C {
fmt.Println("tick at", tick)
}
}()
- 通信: Channel是goroutine之间通信的通道,用于goroutine之间发消息和接收消息
type Cake struct{ state string }
func baker(cooked chan<- *Cake) {
for {
cake := new(Cake)
cake.state = "cooked"
cooked <- cake // baker never touches this cake again
}
}
func icer(iced chan<- *Cake, cooked <-chan *Cake) {
for cake := range cooked {
cake.state = "iced"
iced <- cake // icer never touches this cake again
}
}
- Select多路复用(I/O多路复用,I/O就是指的我们网络I/O,多路指多个TCP连接(或多个Channel),复用指复用一个或少量线程。串起来理解就是很多个网络I/O复用一个或少量的线程来处理这些连接)
- 对channel的read, write,close, 超时事件等进行监听,
- 同时触发事件会随机执行一个
- 阻塞在多个channel上,对多个channel的读/写事件进行监控
func doWork(ch chan int) {
for {
select {
case <-ch:
fmt.Println("receive A ")
case <-ch2:
fmt.Println("receive B ")
case <-time.After(2 * time.Second):
fmt.Println("ss")
default:
fmt.Println("11111")
}
}
}
func channalFunc5() {
var ch chan int = make(chan int)
go doWork(ch)
for i := 0; i < 5; i++ {
ch <- 1
time.Sleep(time.Second * 1)
ch2 <- 2
}
for {
}
}
内部细节
数据结构
type hchan struct {
qcount uint // 当前队列中剩余元素个数
dataqsiz uint // 环形队列长度,即可以存放的元素个数
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的大小
closed uint32 // 标识关闭状态
elemtype *_type // 元素类型
sendx uint // 队列下标,指示元素写入时存放到队列中的位置
recvx uint // 队列下标,指示元素从队列的该位置读出
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
lock mutex // 互斥锁,chan不允许并发读写
}
type waitq sudog{//对G的封装
}
- channel 的主要组成有:
- 一个环形数组实现的循环队列, 用于存储消息元素
- recvq和sendq两个链表实现的 goroutine 等待队列, 用于存储阻塞在 recv 和 send 操作上的 goroutine
- 一个互斥锁,用于各个属性变动的同步
主要函数功能
- makechan: 开辟一快连续内存区域存储消息元素
//伪代码
func makechan(t *chantype, size int) *hchan {
var c *hchan
c = new(hchan)
c.buf = malloc(元素类型大小*size)
c.elemsize = 元素类型大小
c.elemtype = 元素类型
c.dataqsiz = size
return c
}
- send chan<-
- 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒
- 如果缓冲区中有空余位置,将数据写入缓冲区
- 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;
func chansend(msg){
if close !=0 {
panic("close")
return
}
//1.如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时
//直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程
if sg := recvq.dequeue(); sg != nil{
sg.send(msg) //给此goroutine发消息
sg.ready() //唤醒
return
}
//2. 跳过2说明无接收方, 如果有缓冲区且不满的话则写入到缓冲区
if qcount < dataqsiz {
buf.enqueue(msg)
qcount++
return
}
//3. 没空余位置或者没缓冲区, 将待发送数据写入到当前调用的G, 并加入sendq链表,进入休眠,等待被读方唤醒
sg := get_current_g()
sg.msg = msg
sg.g.sleep = true
sendq.enqueue(sg)
}
- recv <-chan
- sendq不为空 获取链表的头一个first_g
- 缓存无数据,将first_g消息复制给当前请求的g,并激活first_g
- 缓存有数据, 缓存队列 出列消息给当前请求的g,并将first_g数据加入缓存队列,first_g激活
- 缓存队列有数据将数据出队 复制给当前请求的g
- 缓存队列无数据将调用此chan的当前g加入recvq链表并设置休眠
- sendq不为空 获取链表的头一个first_g
func chanrecv(){
if sg:= sendq.dequeue(); sg != nil{
if buff <= 0 {
msg := sg.msg
g := get_current_g()
g.send(msg)
sg.sleep = false
} else {
msg := buff.dequeue()
g := get_current_g()
g.send(msg)
buff.enqueue(sg.msg)
sg.sleep = false
}
return true
}
if qcount > 0 {
msg := buff.dequeue()
qcount--
g := get_current_g()
g.send(msg)
return true
}
sg := get_current_g()
sg.msg = msg
sg.g.sleep = true
recvq.enqueue(sg)
}
- close: 设置关闭符号为1,唤醒recvq和sendq的g
func close(){
if chan == nil {
panic("close of nil channel")
return
}
if close !=0 {
panic("close of closed channel")
return
}
close = 1
for sg:=recvq.dequeue();sg!=nil{
sg.sleep = false
}
for sg:=sendq.dequeue();sg!=nil{
sg.sleep = false
}
}