经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Go语言 » 查看文章
go并发 - channel
来源:cnblogs  作者:喜欢嗑瓜子  时间:2023/11/20 8:56:50  对本文有异议

概述

并发编程是利用多核心能力,提升程序性能,而多线程之间需要相互协作、共享资源、线程安全等。任何并发模型都要解决线程间通讯问题,毫不夸张的说线程通讯是并发编程的主要问题。go使用著名的CSP(Communicating Sequential Process,通讯顺序进程)并发模型,从设计之初 Go 语言就注重如何在编程语言层级上设计一个简洁安全高效的抽象模型,让程序员专注于分解问题和组合方案,而且不用被线程管理和信号互斥这些繁琐的操作分散精力。channel是线程简通讯的具体实现之一,本质就是一个线程安全的 FIFO 阻塞队列(先进先出),向队列中写入数据,在另一个线程从队列读取数据。很多语言都有类似实现,比如 Java 的线程池任务队列。

基本使用

通道是引用类型,需要使用 make 创建,格式如下

  1. 通道实例 := make(chan 数据类型, 通道长度)
  • 数据类型:通道内传输的元素类型,可以基本数据类型,也可以使自定义数据类型。
  • 通道实例:通过make创建的通道句柄,与函数名称一样,指向通道的内存首地址。
  • 通道长度:通道本质是队列,创建时候可指定长度,默认为0

创建通道

  1. ch1 := make(chan int) // 创建一个整型类型的通道
  2. ch2 := make(chan interface{}) // 创建一个空接口类型的通道, 可以存放任意格式
  3. ch3 := make(chan *Equip) // 创建Equip指针类型的通道, 可以存放*Equip
  4. ch4 := make(chan *Equip, 10) // 创建Equip指针类型的通道, 并指定队列长度

通道本质就是线程安全的队列,创建时候可以指定队列长度,默认为0。

向通道写入数据,使用语法非常形象,写入channel <-,读取<-channel

  1. ch2 := make(chan interface{}, 10)
  2. ch2<- 10 // 向队列写入
  3. n := <-ch2 // 从队列读取
  4. fmt.Println(n) // 10

箭头语法虽然很形象,但是有些奇怪,也不利于扩展。使用函数方式感觉更好,也更主流,如func (p *chan) get() any func (p *chan) put(any) err,扩展性也更强,通过参数可增加超时、同步、异步等技能。

箭头符号并没有规定位置,与C指针一样,如下两个语句等效

  1. ch1 := make(chan int)
  2. i := <-ch1
  3. i := <- ch1

箭头语法的读写有相对性,可读性一般,有时候无法分辨是读或写,看起来很奇怪,如下伪代码

  1. func main() {
  2. input := make(chan int, 2)
  3. output := make(chan int, 2)
  4. go func() {
  5. input <- 10
  6. }()
  7. output<- <-input
  8. fmt.Println(<-output)
  9. }

管道是用于协程之间通讯,主流使用方式如下

  1. ch2 := make(chan interface{}, 10)
  2. go func() {
  3. data := <-ch2 // 用户协程读取
  4. fmt.Println(data)
  5. }()
  6. ch2 <- "hello" // 主协程写入
  7. time.Sleep(time.Second)

管道也支持遍历,与箭头符号一样,无数据时候循环将被阻塞,循环永远不会结束,除非关闭管道

  1. chanInt := make(chan int, 10)
  2. for chanInt, ok := range chanInts {
  3. fmt.Println(chanInt)
  4. }

管道也支持关闭,关闭后的管道不允许写入,panic 异常

  1. chanInts := make(chan int, 10)
  2. close(chanInts)
  3. chanInts <- 1 // panic: send on closed channel

读取则不同,已有数据可继续读取,无数据时返回false,不阻塞

  1. if value, ok := <-chanInts; ok { // 从管道读取数据不在阻塞
  2. fmt.Println("从管读取=", value)
  3. } else {
  4. fmt.Println("从管道读取失败", ok)
  5. return
  6. }

单向管道

管道也支持单向模式,仅允许读取、或者写入

  1. var queue <-chan string = make(chan string)

函数形参也可以定义定向管道

  1. func customer(channel <-chan string) { // 形参为只读管道
  2. for {
  3. message := <-channel // 只允许读取数据
  4. fmt.Println(message)
  5. }
  6. }
  7. channel := make(chan string)
  8. go customer(channel)

管道阻塞

Go管道的读写都是同步模式,当管道容量还有空间,则写入成功,否则将阻塞直到写入成功。从管道读取也一样,有数据直接读取,否则将阻塞直到读取成功。

  1. var done = make(chan bool)
  2. func aGoroutine() {
  3. fmt.Println("hello")
  4. done <- true // 写管道
  5. }
  6. func main() {
  7. go aGoroutine()
  8. <-done // 读阻塞
  9. }

主协程从管道读取数据时将被阻塞,直到用户协程写入数据。管道非常适合用于生产者消费者模式,需要平滑两者的性能差异,可通过管道容量实现缓冲,所以除非特定场景,都建议管道容量大于零。

有些场景可以使用管道控制线程并发数

  1. // 待补充

阻塞特性也带来了些问题,程序无法控制超时(箭头函数语法的后遗症),go 也提供了解决方案, 使用select关键,与网络编程的select函数类似,监测多个通道是否可读状态,都可读随机选择一个,都不可读进入Default分支,否则阻塞

  1. select {
  2. case n := <-input:
  3. fmt.Println(n)
  4. case m := <-output:
  5. fmt.Println(m)
  6. default:
  7. fmt.Println("default")
  8. }

当然也可以使用select向管道写入数据,只要不关闭管道总是可写入,此时加入default分支永远不会被执行到,如下随机石头剪刀布

  1. ch := make(chan string)
  2. go func() {
  3. for {
  4. select {
  5. case ch <- "石头":
  6. case ch <- "剪刀":
  7. case ch <- "布":
  8. }
  9. }
  10. }()
  11. for value := range ch {
  12. log.Println(value)
  13. time.Sleep(time.Second)
  14. }

模拟线程池

由于go的管道非常轻量且简洁,大部分直接使用,封装线程池模式并不常见。案例仅作为功能演示,非常简单几十行代码即可实现线程池的基本功能,体现了go并发模型的简洁、高效。

  1. type Runnable interface {
  2. Start()
  3. }
  4. // 线程池对象
  5. type ThreadPool struct {
  6. queueSize int
  7. workSize int
  8. channel chan Runnable
  9. wait sync.WaitGroup
  10. }
  11. // 工作线程, 执行异步任务
  12. func (pool *ThreadPool) doWorker(name string) {
  13. log.Printf("%s 启动工作协程", name)
  14. for true {
  15. if runnable, ok := <-pool.channel; ok {
  16. log.Printf("%s 获取任务, %v\n", name, runnable)
  17. runnable.Start()
  18. log.Printf("%s 任务执行成功, %v\n", name, runnable)
  19. } else {
  20. log.Printf("%s 线程池关闭, 退出工作协程\n", name)
  21. pool.wait.Done()
  22. return
  23. }
  24. }
  25. }
  26. // 启动工作线程
  27. func (pool *ThreadPool) worker() {
  28. pool.wait.Add(pool.workSize)
  29. for i := 0; i < pool.workSize; i++ {
  30. go pool.doWorker(fmt.Sprintf("work-%d", i))
  31. }
  32. }
  33. // Submit 提交任务
  34. func (pool *ThreadPool) Submit(task Runnable) bool {
  35. defer func() { recover() }()
  36. pool.channel <- task
  37. return true
  38. }
  39. // Close 关闭线程池
  40. func (pool *ThreadPool) Close() {
  41. defer func() { recover() }()
  42. close(pool.channel)
  43. }
  44. // Wait 等待线程池任务完成
  45. func (pool *ThreadPool) Wait() {
  46. pool.Close()
  47. pool.wait.Wait()
  48. }
  49. // NewThreadPool 工厂函数,创建线程池
  50. func NewThreadPool(queueSize int, workSize int) *ThreadPool {
  51. pool := &ThreadPool{queueSize: queueSize, workSize: workSize, channel: make(chan Runnable, queueSize)}
  52. pool.worker()
  53. return pool
  54. }

使用线程池

  1. type person struct {
  2. name string
  3. }
  4. func (p *person) Start() {
  5. fmt.Println(p.name)
  6. }
  7. func main() {
  8. threadPool := executor.NewThreadPool(10, 2) // 创建线程池, 队列长度10, 工作线程2
  9. for i := 0; i < 5; i++ {
  10. threadPool.Submit(&person{name: "xx"}) // 提交十个任务
  11. }
  12. threadPool.Wait() // 阻塞等待所有任务完成
  13. }

模拟管道

任何线程之间的通讯都依赖底层锁机制,channel是对锁机制封装后的实现对象,与Java中线程池任务队列机制几乎一样,但要简洁很多。使用切片简单模拟
接口声明

  1. type Queue interface {
  2. // Put 向队列添加任务, 添加成功返回true, 添加失败返回false, 队列满了则阻塞直到添加成功
  3. Put(task interface{}) bool
  4. // Get 从队列获取任务, 一直阻塞直到获取任务, 队列关闭且没有任务则返回false
  5. Get() (interface{}, bool)
  6. // Size 查看队列中的任务数
  7. Size() int
  8. // Close 关闭队列, 关闭后将无法添加任务, 已有的任务可以继续获取
  9. Close()
  10. }

基于切片实现

  1. // SliceQueue 使用切片实现, 自动扩容属性队列永远都不会满, 扩容时候会触发数据复制, 性能一般
  2. type SliceQueue struct {
  3. sync.Mutex
  4. cond *sync.Cond
  5. queue []interface{}
  6. close atomic.Bool
  7. }
  8. func (q *SliceQueue) Get() (data interface{}, ok bool) {
  9. q.Lock()
  10. defer q.Unlock()
  11. for true {
  12. if len(q.queue) == 0 {
  13. if q.close.Load() == true {
  14. return nil, false
  15. }
  16. q.cond.Wait()
  17. }
  18. if data := q.doGet(); data != nil {
  19. return data, true
  20. }
  21. }
  22. return
  23. }
  24. func (q *SliceQueue) doGet() interface{} {
  25. if len(q.queue) >= 1 {
  26. data := q.queue[0]
  27. q.queue = q.queue[1:]
  28. return data
  29. }
  30. return nil
  31. }
  32. func (q *SliceQueue) Put(task interface{}) bool {
  33. q.Lock()
  34. defer func() {
  35. q.cond.Signal()
  36. q.Unlock()
  37. }()
  38. if q.close.Load() == true {
  39. return false
  40. }
  41. q.queue = append(q.queue, task)
  42. return true
  43. }
  44. func (q *SliceQueue) Size() int {
  45. return len(q.queue)
  46. }
  47. func (q *SliceQueue) Close() {
  48. if q.close.Load() == true {
  49. return
  50. }
  51. q.Lock()
  52. defer q.Unlock()
  53. q.close.Store(true)
  54. q.cond.Broadcast()
  55. }
  56. func NewSliceQueue() Queue {
  57. sliceQueue := &SliceQueue{queue: make([]interface{}, 0, 2)}
  58. sliceQueue.cond = sync.NewCond(sliceQueue)
  59. return sliceQueue
  60. }

基于环行数组实现

  1. type ArrayQueue struct {
  2. sync.Mutex
  3. readCond *sync.Cond
  4. writeCond *sync.Cond
  5. readIndex int
  6. writeIndex int
  7. queueMaxSize int
  8. close atomic.Bool
  9. queue []interface{}
  10. }
  11. func (q *ArrayQueue) Put(task interface{}) bool {
  12. q.Lock()
  13. defer q.Unlock()
  14. for true {
  15. if q.close.Load() == true {
  16. return false
  17. }
  18. if q.IsFull() {
  19. q.writeCond.Wait()
  20. if q.IsFull() {
  21. continue
  22. }
  23. }
  24. q.queue[q.writeIndex] = task
  25. q.writeIndex = (q.writeIndex + 1) % q.queueMaxSize
  26. q.readCond.Signal()
  27. return true
  28. }
  29. return true
  30. }
  31. func (q *ArrayQueue) Get() (interface{}, bool) {
  32. q.Lock()
  33. defer q.Unlock()
  34. for true {
  35. if q.IsEmpty() {
  36. if q.close.Load() == true {
  37. return nil, false
  38. }
  39. q.readCond.Wait()
  40. if q.IsEmpty() {
  41. continue
  42. }
  43. }
  44. task := q.queue[q.readIndex]
  45. q.readIndex = (q.readIndex + 1) % q.queueMaxSize
  46. q.writeCond.Signal()
  47. return task, true
  48. }
  49. return nil, true
  50. }
  51. func (q *ArrayQueue) Size() int {
  52. return q.queueMaxSize
  53. }
  54. func (q *ArrayQueue) Close() {
  55. if q.close.Load() == true {
  56. return
  57. }
  58. q.Lock()
  59. q.Unlock()
  60. q.close.Store(true)
  61. q.readCond.Broadcast()
  62. }
  63. func (q *ArrayQueue) IsFull() bool {
  64. return (q.writeIndex+1)%q.queueMaxSize == q.readIndex
  65. }
  66. func (q *ArrayQueue) IsEmpty() bool {
  67. return q.readIndex == q.writeIndex
  68. }
  69. func NewArrayQueue(size int) Queue {
  70. queue := &ArrayQueue{queue: make([]interface{}, size), readIndex: 0, writeIndex: 0, queueMaxSize: size}
  71. queue.readCond = sync.NewCond(queue)
  72. queue.writeCond = sync.NewCond(queue)
  73. return queue
  74. }

测试用例

  1. func TestWith(t *testing.T) {
  2. q := NewSliceQueue()
  3. go func() {
  4. time.Sleep(time.Second * 2)
  5. q.Put(true) // 向队列写入数据, 与 chan<- 功能相同
  6. }()
  7. q.Get() // 阻塞直到读取数据, 与 <-chan 功能相同
  8. }

原文链接:https://www.cnblogs.com/asdfzxv/p/17841629.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号