经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Go语言 » 查看文章
从源码分析 Go 语言使用 cgo 导致的线程增长
来源:cnblogs  作者:T_MAX  时间:2023/6/5 14:52:16  对本文有异议

TDengine Go 连接器 https://github.com/taosdata/driver-go 使用 cgo 调用 taos.so 中的 API,使用过程中发现线程数不断增长,本文从一个 cgo 调用开始解析 Go 源码,分析造成线程增长的原因。

转换 cgo 代码

对 driver-go/wrapper/taosc.go 进行转换

go tool cgo taosc.go

执行后生成 _obj 文件夹

go 代码分析

taosc.cgo1.goTaosResetCurrentDB 为例来分析。

  1. // TaosResetCurrentDB void taos_reset_current_db(TAOS *taos);
  2. func TaosResetCurrentDB(taosConnect unsafe.Pointer) {
  3. func() { _cgo0 := /*line :161:26*/taosConnect; _cgoCheckPointer(_cgo0, nil); _Cfunc_taos_reset_current_db(_cgo0); }()
  4. }
  5. //go:linkname _cgoCheckPointer runtime.cgoCheckPointer
  6. func _cgoCheckPointer(interface{}, interface{})
  7. //go:cgo_unsafe_args
  8. func _Cfunc_taos_reset_current_db(p0 unsafe.Pointer) (r1 _Ctype_void) {
  9. _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))
  10. if _Cgo_always_false {
  11. _Cgo_use(p0)
  12. }
  13. return
  14. }
  15. //go:linkname _cgo_runtime_cgocall runtime.cgocall
  16. func _cgo_runtime_cgocall(unsafe.Pointer, uintptr) int32
  17. //go:cgo_import_static _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
  18. //go:linkname __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
  19. var __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db byte
  20. var _cgo_453a0cad50ef_Cfunc_taos_reset_current_db = unsafe.Pointer(&__cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db)
  • TaosResetCurrentDB 首先调用 _cgoCheckPointer 检查传入参数是否为 nil
  • //go:linkname _cgoCheckPointer runtime.cgoCheckPointer 表示 cgoCheckPointer 方法实现是 runtime.cgoCheckPointer,如果传入参数是 nil 程序将会 panic
  • 接着调用 _Cfunc_taos_reset_current_db
  • Cfunc_taos_reset_current_db 方法中 _Cgo_always_false 在运行时会是 false,所以只分析第一句 _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))
    • _cgo_runtime_cgocall 实现是 runtime.cgocall 这个会重点分析。
    • _cgo_453a0cad50ef_Cfunc_taos_reset_current_db 由上方最后代码块可以看出是 taos_reset_current_db 方法指针。
    • uintptr(unsafe.Pointer(&p0)) 表示 p0 的指针地址。
    • 由上面可以看出这句意思是调用 runtime.cgocall,参数为方法指针和参数的指针地址。

分析 runtime.cgocall

基于 golang 1.20.4 分析该方法

  1. func cgocall(fn, arg unsafe.Pointer) int32 {
  2. if !iscgo && GOOS != "solaris" && GOOS != "illumos" && GOOS != "windows" {
  3. throw("cgocall unavailable")
  4. }
  5. if fn == nil {
  6. throw("cgocall nil")
  7. }
  8. if raceenabled {
  9. racereleasemerge(unsafe.Pointer(&racecgosync))
  10. }
  11. mp := getg().m // 获取当前 goroutine 的 M
  12. mp.ncgocall++ // 总 cgo 计数 +1
  13. mp.ncgo++ // 当前 cgo 计数 +1
  14. mp.cgoCallers[0] = 0 // 重置追踪
  15. entersyscall() // 进入系统调用,保存上下文, 标记当前 goroutine 独占 m, 跳过垃圾回收
  16. osPreemptExtEnter(mp) // 标记异步抢占, 使异步抢占逻辑失效
  17. mp.incgo = true // 修改状态
  18. errno := asmcgocall(fn, arg) // 真正进行方法调用的地方
  19. mp.incgo = false // 修改状态
  20. mp.ncgo-- // 当前 cgo 调用-1
  21. osPreemptExtExit(mp) // 恢复异步抢占
  22. exitsyscall() // 退出系统调用,恢复调度器控制
  23. if raceenabled {
  24. raceacquire(unsafe.Pointer(&racecgosync))
  25. }
  26. // 避免 GC 过早回收
  27. KeepAlive(fn)
  28. KeepAlive(arg)
  29. KeepAlive(mp)
  30. return errno
  31. }

其中两个主要的方法 entersyscallasmcgocall,接下来对这两个方法进行着重分析。

分析 entersyscall

  1. func entersyscall() {
  2. reentersyscall(getcallerpc(), getcallersp())
  3. }

entersyscall 直接调用的 reentersyscall,关注下 reentersyscall 注释中的一段:

  1. // If the syscall does not block, that is it, we do not emit any other events.
  2. // If the syscall blocks (that is, P is retaken), retaker emits traceGoSysBlock;

如果 syscall 调用没有阻塞则不会触发任何事件,如果被阻塞 retaker 会触发 traceGoSysBlock,那需要了解一下多长时间被认为是阻塞,先跟到 retaker 方法。

  1. func retake(now int64) uint32 {
  2. n := 0
  3. lock(&allpLock)
  4. for i := 0; i < len(allp); i++ {
  5. pp := allp[i]
  6. if pp == nil {
  7. continue
  8. }
  9. pd := &pp.sysmontick
  10. s := pp.status
  11. sysretake := false
  12. if s == _Prunning || s == _Psyscall {
  13. t := int64(pp.schedtick)
  14. if int64(pd.schedtick) != t {
  15. pd.schedtick = uint32(t)
  16. pd.schedwhen = now
  17. } else if pd.schedwhen+forcePreemptNS <= now {
  18. preemptone(pp)
  19. sysretake = true
  20. }
  21. }
  22. // 从系统调用中抢占P
  23. if s == _Psyscall {
  24. // 如果已经超过了一个系统监控的 tick(20us),则从系统调用中抢占 P
  25. t := int64(pp.syscalltick)
  26. if !sysretake && int64(pd.syscalltick) != t {
  27. pd.syscalltick = uint32(t)
  28. pd.syscallwhen = now
  29. continue
  30. }
  31. if runqempty(pp) && sched.nmspinning.Load()+sched.npidle.Load() > 0 && pd.syscallwhen+10*1000*1000 > now {
  32. continue
  33. }
  34. unlock(&allpLock)
  35. incidlelocked(-1)
  36. if atomic.Cas(&pp.status, s, _Pidle) {
  37. if trace.enabled {
  38. traceGoSysBlock(pp)
  39. traceProcStop(pp)
  40. }
  41. n++
  42. pp.syscalltick++
  43. handoffp(pp)
  44. }
  45. incidlelocked(1)
  46. lock(&allpLock)
  47. }
  48. }
  49. unlock(&allpLock)
  50. return uint32(n)
  51. }

从上面可以看到系统调用阻塞 20 多微秒会被抢占 P,cgo 被迫 handoffp,接下来分析 handoffp 方法

  1. func handoffp(pp *p) {
  2. // ...
  3. // 没有任务且没有自旋和空闲的 M 则需要启动一个新的 M
  4. if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) {
  5. sched.needspinning.Store(0)
  6. startm(pp, true)
  7. return
  8. }
  9. // ...
  10. }

handoffp 方法会调用 startm 来启动一个新的 M,跟到 startm 方法。

  1. func startm(pp *p, spinning bool) {
  2. // ...
  3. nmp := mget()
  4. if nmp == nil {
  5. // 没有M可用,调用newm
  6. id := mReserveID()
  7. unlock(&sched.lock)
  8. var fn func()
  9. if spinning {
  10. fn = mspinning
  11. }
  12. newm(fn, pp, id)
  13. releasem(mp)
  14. return
  15. }
  16. // ...
  17. }

此时如果没有 M startm 会调用 newm 创建一个新的 M,接下来分析 newm 方法。

  1. func newm(fn func(), pp *p, id int64) {
  2. acquirem()
  3. mp := allocm(pp, fn, id)
  4. mp.nextp.set(pp)
  5. mp.sigmask = initSigmask
  6. if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
  7. lock(&newmHandoff.lock)
  8. if newmHandoff.haveTemplateThread == 0 {
  9. throw("on a locked thread with no template thread")
  10. }
  11. mp.schedlink = newmHandoff.newm
  12. newmHandoff.newm.set(mp)
  13. if newmHandoff.waiting {
  14. newmHandoff.waiting = false
  15. notewakeup(&newmHandoff.wake)
  16. }
  17. unlock(&newmHandoff.lock)
  18. releasem(getg().m)
  19. return
  20. }
  21. newm1(mp)
  22. releasem(getg().m)
  23. }
  24. func newm1(mp *m) {
  25. if iscgo {
  26. var ts cgothreadstart
  27. if _cgo_thread_start == nil {
  28. throw("_cgo_thread_start missing")
  29. }
  30. ts.g.set(mp.g0)
  31. ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
  32. ts.fn = unsafe.Pointer(abi.FuncPCABI0(mstart))
  33. if msanenabled {
  34. msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
  35. }
  36. if asanenabled {
  37. asanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
  38. }
  39. execLock.rlock()
  40. // 创建新线程
  41. asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
  42. execLock.runlock()
  43. return
  44. }
  45. execLock.rlock()
  46. newosproc(mp)
  47. execLock.runlock()
  48. }

newm 看出如果线程都在阻塞中则调用 newm1newm1 调用 _cgo_thread_start 创建新线程。

由以上分析得出当高并发调用 cgo 且执行时间超过 20 微秒时会创建新线程。

分析 asmcgocall

只分析 amd64
asm_amd64.s

  1. TEXT ·asmcgocall(SB),NOSPLIT,$0-20
  2. MOVQ fn+0(FP), AX
  3. MOVQ arg+8(FP), BX
  4. MOVQ SP, DX
  5. // 考虑是否需要切换到 m.g0 栈
  6. // 也用来调用创建新的 OS 线程,这些线程已经在 m.g0 栈中了
  7. get_tls(CX)
  8. MOVQ g(CX), DI
  9. CMPQ DI, $0
  10. JEQ nosave
  11. MOVQ g_m(DI), R8
  12. MOVQ m_gsignal(R8), SI
  13. CMPQ DI, SI
  14. JEQ nosave
  15. MOVQ m_g0(R8), SI
  16. CMPQ DI, SI
  17. JEQ nosave
  18. // 切换到系统栈
  19. CALL gosave_systemstack_switch<>(SB)
  20. MOVQ SI, g(CX)
  21. MOVQ (g_sched+gobuf_sp)(SI), SP
  22. // 于调度栈中(pthread 新创建的栈)
  23. // 确保有足够的空间给四个 stack-based fast-call 寄存器
  24. // 为使得 windows amd64 调用服务
  25. SUBQ $64, SP
  26. ANDQ $~15, SP // 为 gcc ABI 对齐
  27. MOVQ DI, 48(SP) // 保存 g
  28. MOVQ (g_stack+stack_hi)(DI), DI
  29. SUBQ DX, DI
  30. MOVQ DI, 40(SP) // 保存栈深 (不能仅保存 SP,因为栈可能在回调时被复制)
  31. MOVQ BX, DI // DI = AMD64 ABI 第一个参数
  32. MOVQ BX, CX // CX = Win64 第一个参数
  33. CALL AX // 调用 fn
  34. // 恢复寄存器、 g、栈指针
  35. get_tls(CX)
  36. MOVQ 48(SP), DI
  37. MOVQ (g_stack+stack_hi)(DI), SI
  38. SUBQ 40(SP), SI
  39. MOVQ DI, g(CX)
  40. MOVQ SI, SP
  41. MOVL AX, ret+16(FP)
  42. RET
  43. nosave:
  44. // 在系统栈上运行,可能没有 g
  45. // 没有 g 的情况发生在线程创建中或线程结束中(比如 Solaris 平台上的 needm/dropm)
  46. // 这段代码和上面类似,但没有保存和恢复 g,且没有考虑栈的移动问题(因为我们在系统栈上,而非 goroutine 栈)
  47. // 如果已经在系统栈上,则上面的代码可被直接使用,在 Solaris 上会进入下面这段代码。
  48. // 使用这段代码来为所有 "已经在系统栈" 的调用进行服务,从而保持正确性。
  49. SUBQ $64, SP
  50. ANDQ $~15, SP // ABI 对齐
  51. MOVQ $0, 48(SP) // 上面的代码保存了 g, 确保 debug 时可用
  52. MOVQ DX, 40(SP) // 保存原始的栈指针
  53. MOVQ BX, DI // DI = AMD64 ABI 第一个参数
  54. MOVQ BX, CX // CX = Win64 第一个参数
  55. CALL AX
  56. MOVQ 40(SP), SI // 恢复原来的栈指针
  57. MOVQ SI, SP
  58. MOVL AX, ret+16(FP)
  59. RET

这段就是将当前栈移到系统栈去执行,因为 C 需要无穷大的栈,在 Go 的栈上执行 C 函数会导致栈溢出。

产生问题

cgo 调用会将当前栈移到系统栈,并且当 cgo 高并发调用且阻塞超过 20 微秒时会新建线程。而 Go 并不会销毁线程,由此造成线程增长。

解决方案

限制 Go 程序最大线程数,默认为 cpu 核数。

  1. runtime.GOMAXPROCS(runtime.NumCPU())

使用 channel 限制 cgo 最大并发数为 cpu 核数

  1. package thread
  2. import "runtime"
  3. var c chan struct{}
  4. func Lock() {
  5. c <- struct{}{}
  6. }
  7. func Unlock() {
  8. <-c
  9. }
  10. func init() {
  11. c = make(chan struct{}, runtime.NumCPU())
  12. }

针对超过 20 微秒的 cgo 调用进行限制:

  1. thread.Lock()
  2. wrapper.TaosFreeResult(result)
  3. thread.Unlock()

原文链接:https://www.cnblogs.com/t102011/p/17457120.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号