作者:加肥的猫miao_115 | 来源:互联网 | 2023-09-25 18:00
代码的本意,是在i个协程并发的执行实现后,启动一次nextProcess工作,代码应用了sync.Map来保护和同步i个协程的执行进度,避免多协程并发造成的map不平安读写。当最初一个协程执行结束,sync.Map为空,启动一次nextProcess。但能读到状态值syncTaskProcessCount为0的协程,只会是最初一个执行实现的
go
内置了协程平安的 sync
包来不便咱们同步各协程之间的执行状态,应用起来也十分不便。
最近在排查解决一个线下服务的数据同步问题,review
外围代码后,发现这么一段流程控制代码。
谬误示例
package main
import (
"log"
"runtime"
"sync"
)
func main() {
// 可并行也是重点,生产场景没几个单核的吧??
runtime.GOMAXPROCS(runtime.NumCPU())
waitGrp := &sync.WaitGroup{}
waitGrp.Add(1)
syncTaskProcessMap := &sync.Map{}
for i := 0; i <100; i++ {
syncTaskProcessMap.Store(i, i)
}
for j := 0; j <100; j++ {
go func(j int) {
// 协程可能并行抢占一轮开始
syncTaskProcessMap.Delete(j)
// 协程可能并行抢占一轮完结
// 在以后协程 Delete 后 Range 前 又被其余协程 Delete 操作了
syncTaskProcessCount := 0
syncTaskProcessMap.Range(func(key, value interface{}) bool {
syncTaskProcessCount++
return true
})
if syncTaskProcessCount == 0 {
log.Println(GetGoroutineID(), "syncTaskProcessMap empty, start syncOnline", syncTaskProcessCount)
}
}(j)
}
waitGrp.Wait()
}
func GetGoroutineID() uint64 {
b := make([]byte, 64)
runtime.Stack(b, false)
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
代码的本意,是在 i
个协程并发的执行实现后,启动一次 nextProcess
工作,代码应用了 sync.Map
来保护和同步 i
个协程的执行进度,避免多协程并发造成的 map
不平安读写。当最初一个协程执行结束,sync.Map
为空,启动一次 nextProcess
。但能读到状态值 syncTaskProcessCount
为 0
的协程,只会是 最初一个
执行实现的协程吗?
sync.Map::Store\Load\Delete\Range
都是协程平安的操作,在调用期间只会被以后 协程
抢占拜访,但它们的组合操作并不是 独占
的,下面的代码认为,Delete && Range
两项操作期间 不会
夹带其余协程对 sync.Map
读写操作,导致能读到 syncTaskProcessCount
为 0
的协程可能不止最初一个执行结束的。
多执行几次,可能失去一下输入:
sqrtcat:demo$ go run test.go
2021/04/20 14:30:27 114 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go
2021/04/20 14:30:30 117 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:30 116 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go
2021/04/20 14:30:33 117 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go
2021/04/20 14:30:35 117 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:35 118 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:35 115 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go
2021/04/20 14:30:38 131 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:38 132 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
能够看到,syncTaskProcessMap empty
的状态被多个协程读到了。
G117
,G118
,G115
在多核场景下肯能 并行
执行。
SyncMap
被 G117
抢占,Delete
后 2,SyncMap
被开释。
SyncMap
被 G118
抢占,Delete
后 1,SyncMap
被开释。
SyncMap
被 G115
抢占,Delete
后 0,SyncMap
被开释。
- 这时的
syncMap
未然为空,G117、G118、G115
持续 Range
失去的 syncTaskProcessCount
都为 0
,这样就导致了代码执行与冀望不同了。
所以,尽管 sync.Map
的繁多操作是主动加锁的排他操作,但组合在一起就不是了,咱们要自行在 code section
上加锁。
正确示例
package main
import (
"log"
"runtime"
"sync"
)
// 错误代码示例
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
syncMutex := &sync.Mutex{}
waitGrp := &sync.WaitGroup{}
waitGrp.Add(1)
syncTaskProcessMap := &sync.Map{}
for i := 0; i <100; i++ {
syncTaskProcessMap.Store(i, i)
}
for j := 0; j <100; j++ {
go func(j int) {
// 保障协程对 syncMap 的组合操作也是独占的
// 将可能的并行操作程序化
syncMutex.Lock()
defer syncMutex.Unlock()
syncTaskProcessMap.Delete(j)
syncTaskProcessCount := 0
syncTaskProcessMap.Range(func(key, value interface{}) bool {
syncTaskProcessCount++
return true
})
if syncTaskProcessCount == 0 {
log.Println(GetGoroutineID(), "syncTaskProcessMap empty, start syncOnline", syncTaskProcessCount)
}
}(j)
}
waitGrp.Wait()
}
func GetGoroutineID() uint64 {
b := make([]byte, 64)
runtime.Stack(b, false)
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
协程并行
在 多核
的平台上,调配在不同 工夫片队列
上的协程是能够 并行
执行的,雷同 工夫片队列
上的协程是 并发
执行的
func main() {
// 这行代码将会影响子协程里的日志输出量
runtime.GOMAXPROCS(runtime.NumCPU())
waitChan := make(chan int)
go func() {
defer func() {
log.Println(GetGoroutineID(), "sub defer")
}()
log.Println(GetGoroutineID(), "sub start")
waitChan <- 1
log.Println(GetGoroutineID(), "sub finish")
}()
log.Println(GetGoroutineID(), "main start")
log.Println(<-waitChan)
log.Println(GetGoroutineID(), "main finish")
}
- 如果
main
和 sub
调配在了同一个 cpu
上 或只有一个 cpu
,main start
,waitChan
读阻塞了 main
,sub
开始执行,sub start
,写入 waitChan
,后续也没有触发协程切换的代码段,继续执行 sub finish
sub defer
退出,交出 工夫片
,main
继续执行 main finish
。
- 如果
main
和 sub
调配在了不同 cpu
上,当 waitChan
阻塞了 cpu1
上的 main
,而 sub
被 cpu2
执行了 写入waitChan
后,main
可能会被 cpu1
立刻继续执行,主协程 main
退出,sub
也会被终止执行,前面的日志打印可能就执行不到了。
sqrtcat:demo$ go run test.go
2021/04/20 15:26:42 5 sub start
2021/04/20 15:26:42 1 main start
2021/04/20 15:26:42 1
2021/04/20 15:26:42 1 main finish
2021/04/20 15:26:42 5 sub finish