1. 简介

channel也叫通道,类似于一个队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。channel一般分为无缓存通道和有缓存通道,无缓存通道指缓存为0的channel,有缓存通道指缓存大于0的channel

如下是无缓存通道的示例:

func TestChannelNoBuffer(t *testing.T) {
	ch1 := make(chan string)	// 初始化一个缓存为0的通道
	go func() {
		val1 := <-ch1
		fmt.Println(val1)
	}()
	// 如果没有用goroutine去接收通道内的值,这一步将会阻塞
	// 所以goroutine需要写在阻塞这一步的前面
	ch1 <- "value"
    
    // Output
    // value
}

如下是有缓存通道的示例:

func TestWithBuffer(t *testing.T) {
	ch2 := make(chan string, 2)		// 初始化一个缓存为2的通道
	for i := 0; i < 2; i++ {
		ch2 <- "value " + strconv.Itoa(i)
	}
	for i := 0; i < 2; i++ {
		temp := <-ch2
		fmt.Println(temp)
	}
    
    // Output
    // value 1
    // value 2
}

2. channel实现细节

先来看一下channel的结构体

type hchan struct {
	qcount   uint           // channel中元素个数
	dataqsiz uint           // channel中循环队列的大小
	buf      unsafe.Pointer // 指向datasiz元素的数组
	elemsize uint16
	closed   uint32
	elemtype *_type // 元素类型
	sendx    uint   // 发送操作的下标
	recvx    uint   // 接收操作的下标
	recvq    waitq  // 接收的时候,如果channel缓冲区为空,也没发送者,就把goroutine放到这个链表
	sendq    waitq  // 发送的时候,如果channel缓冲区满了,也没接收者,就把goroutine放到这个链表

	lock mutex
}

type waitq struct {	// 双向链表
	first *sudog
	last  *sudog
}

2.1 make初始化channel

make用于初始化channel,如下是make的源码主体部分

func makechan(t *chantype, size int) *hchan {
	elem := t.elem
	......
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	var c *hchan
	switch {
	case mem == 0:	// 当创建无缓冲channel时,只会为chan分配一段内存空间
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:	// 当传入的chan类型不是指针类型时,会为chan和buf一次性分配一块连续的内存空间
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:	// 否则会为chan和buf新创建一块内存空间
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

    // 统一更新如下字段
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	......
	return c
}

初始化channel一定要用make的方式,不然channel会处于nil的状态,如这种创建方式就会导致channel处于nil状态:var ch chan int

2.2 向chan发送数据 chan <- i

下图是channel常见的异常总结,对于理解channel源码有一定帮助

在此之前先看该源码的大致逻辑可以更加轻松的理解

如下源码是向channel发送数据时调用的主体部分

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {	// 如上图,如果向为nil的channel发送数据,则会阻塞
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
    
    ......
    
    lock(&c.lock)
	if c.closed != 0 {	// 向已经关闭的channel发送数据会panic,
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
    // 如果存在阻塞的接收者,则直接把值发送给它,绕过发送通道缓冲区
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
    
    // 如果前面没有发现阻塞的接收者,则判断channel的缓冲区是否满了,
    // 如果没有满,则把发送者放到发送通道缓冲区
    if c.qcount < c.dataqsiz {
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
    
    ......
    
    // 若上面的缓冲区也满了,则会丢给sendq并阻塞,等待接收者唤醒自己
    c.sendq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	KeepAlive(ep)	// 确保值不会被gc掉
    
    // someone woke us up
    // 被唤醒
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
    
    ......
}

2.3 从chan接收数据 i := <- chan

从chan接收数据即把channel里面的值取出来,如下是源码部分,先来看一张逻辑图,便于理解

如下是源码部分

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ......
    if c == nil {	// 如果channel为nil,则会阻塞
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
    
    ......
    
    // 如果存在阻塞的发送者,则直接从这个发送者接收数据
    if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
	
    // 如果接收通道缓冲区没有放满,则把这个接收者放到接收通道缓冲区
	if c.qcount > 0 {
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
    
    ......
    
    // 如果缓冲区也放满了,则会丢到recvq并阻塞,等待有发送者把自己唤醒
    c.recvq.enqueue(mysg)
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    
    // someone woke us up
    // 被唤醒
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
    
    ......
}

这里有一个点要注意一下,从已关闭的channel取数据是不会panic的,可以正常取值,如果取完了channel里面的值且channel已关闭,仍然取值的话,就会返回类型相应的默认值,如下例:

func TestChannelNoBuffer(t *testing.T) {
	ch1 := make(chan int)
	go func() {
		time.Sleep(time.Second)
		val1 := <-ch1
		fmt.Println(val1)
		val1 = <-ch1
		fmt.Println(val1)
		val1 = <-ch1
		fmt.Println(val1)
	}()
	// 如果没有用goroutine去接收通道内的值,这一步将会阻塞
	// 所以goroutine需要写在阻塞这一步的前面
	ch1 <- 10
	close(ch1)
    
    // Output
    // 10
    // 0
    // 0
}

如果从没有关闭的channel取值,当里面的值取完了仍然取值的话,是取不到值的,会造成另外两个goroutine泄漏,如下例:

func TestChannelNoBuffer(t *testing.T) {
	ch1 := make(chan int)
	go func() {
		time.Sleep(time.Second)
		val1 := <-ch1
		fmt.Println(val1)
	}()
    go func() {
		time.Sleep(time.Second)
		val1 := <-ch1
		fmt.Println(val1)
	}()
	go func() {
		time.Sleep(time.Second)
		val1 := <-ch1
		fmt.Println(val1)
	}()
	// 如果没有用goroutine去接收通道内的值,这一步将会阻塞
	// 所以goroutine需要写在阻塞这一步的前面
	ch1 <- 10
	//close(ch1)
    
    // Output
    // 10
}

2.4 close关闭channel

如下是channel的关闭操作的源码

func closechan(c *hchan) {
	if c == nil {	// 关闭nil的channel会panic
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 {	// 关闭已经关闭的channel会panic
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

    ......

    // 把所有接收队列里阻塞的等待者出队,并放入glist
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// 把所有发送队列里阻塞的等待者出队,并放入glist
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

    // 如果glist不为空,则为这些被阻塞的goroutine调用goready函数唤醒这些goroutine
    // 并重新对这些goroutine进行调度
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

3. 生产者-消费者模型

如下是一个简单的生产者消费者模型

var wg sync.WaitGroup

func producer(data chan<- int) {
	for i := 0; i < 4; i++ {
		data <- i
	}
    // 这里记得要关闭channel,不然会发生阻塞,因为消费者的数量没有限制,
    // 当消费者从空的channel取值的时候会阻塞
	close(data)
}

func consumer(data <-chan int) {
	defer wg.Done()
	for {
		v, ok := <-data
		if !ok {
			break
		}
		fmt.Println("---:", v, "  ===:", ok)
	}
}

func main() {
	data := make(chan int)
	go producer(data)

	wg.Add(1)
	go consumer(data)
	wg.Wait()
}

// Output
// ---: 0   ===: true
// ---: 1   ===: true
// ---: 2   ===: true
// ---: 3   ===: true

4. channel模拟实现消息队列

由于channel所具有的队列特性,因此可以尝试使用它来模拟实现一个消息队列。需要声明channel具有如下一些特性:

  • channel里面的每个数据只能被一个goroutine消费,拿走了就没有了
  • channel一般是按照先进先出的方式消费,无法随机消费

关于消息队列的介绍,可以参考我的这篇文章: Title 消息队列学习 | 基础

其实普通的channel使用就是消息队列的队列模型,一边发送,另一边接收,顺序是FIFO,队列模型是早期的消息队列使用的数据结构,本身比较简单,就不做模拟实现,现在来看看如何实现一些其他的模型

3.1 发布订阅模型

发布订阅模型的基本逻辑如下图:

生产者往Broker里发送消息,然后供多个消费者订阅,此处没有深入到Broker的细节如Topic、Partition等,只是使用channel简单模拟消息队列中生产者和消费者的关系。此处模拟一个消费者发送消息,有多个消费者消费的场景

type Broker struct {
	consumers []*Consumer
}

type Consumer struct {
	ch chan string
}

func (b *Broker) produce(msg string) {
	// 轮询给消费者发送消息
	for _, v := range b.consumers {
		v.ch <- msg
	}
}

func (b *Broker) subscribe(consumer *Consumer) {
	b.consumers = append(b.consumers, consumer)
}

func TestMq1(t *testing.T) {
	// 初始化一个Broker节点
	b := &Broker{
		consumers: make([]*Consumer, 0, 4),
	}

	// 创建2个消费者
	consumer1 := &Consumer{
		ch: make(chan string, 1),
	}
	consumer2 := &Consumer{
		ch: make(chan string, 1),
	}

	// 这2个消费者订阅Broker
	b.subscribe(consumer1)
	b.subscribe(consumer2)

	// 生产者发送一个消息
	b.produce("一条消息")

	// 2个消费者拿到了刚才生产者发送的消息
	fmt.Println(<-consumer1.ch)
	fmt.Println(<-consumer2.ch)
	
	// Output
	// 一条消息
	// 一条消息
}

5. 参考链接

极客时间go实战训练营

https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/channel.html

https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#64-channel